From dab974b4a83fdbad1f16e75a4aba2f0447379f2d Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Tue, 5 Aug 2025 13:11:43 +0100 Subject: [PATCH 01/43] Adding print_tree method to FlowGraph --- .../flowfile_core/flowfile/flow_graph.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 11770b1f..5d5243e9 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -269,6 +269,24 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" + def print_tree(self): + """ + Print flow_graph as a tree. + """ + + str_repr = result.flow_graph._node_db + last_key = len(str_repr.keys()) + tree= "" + + for k, v in str_repr.items(): + operation = str(v).split("(")[1][:-1].replace("_", " ").title() + tree += str(operation) + " (id=" + str(k) + "):" + # This is still missing the operation type. Need to check how to access FlowFile object + if k < last_key: + tree += "\n" + "# " + "\t"*(k-1) + "|___ " + + return print(tree) + def get_nodes_overview(self): output = [] for v in self._node_db.values(): From 8aa0b746b6edc2caef4e2c776a3c945b9e949419 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 6 Aug 2025 17:35:00 +0100 Subject: [PATCH 02/43] Adding method print_tree() to FlowGraph --- .../flowfile_core/flowfile/flow_graph.py | 61 ++++++++++++++++--- 1 file changed, 51 insertions(+), 10 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 5d5243e9..24bb8602 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -269,21 +269,62 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def print_tree(self): + def print_tree(self, show_schema=False, show_descriptions=False):: """ Print flow_graph as a tree. """ + max_node_id = max(self._node_db.keys()) - str_repr = result.flow_graph._node_db - last_key = len(str_repr.keys()) - tree= "" + tree = "" + tabs = 0 - for k, v in str_repr.items(): - operation = str(v).split("(")[1][:-1].replace("_", " ").title() - tree += str(operation) + " (id=" + str(k) + "):" - # This is still missing the operation type. Need to check how to access FlowFile object - if k < last_key: - tree += "\n" + "# " + "\t"*(k-1) + "|___ " + for node in self.nodes: + tab_counter += 1 + node_input = node.setting_input + operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() + + if operation == "Formula": + operation = "With Columns" + + + tree += str(operation) + " (id=" + str(node_input.node_id) + ")" + + if show_descriptions & show_schema: + raise ValueError('show_descriptions and show_schema cannot be True simultaneously') + if show_descriptions: + tree += ": " + str(node_input.description) + elif show_schema: + tree += " -> [" + if operation == "Manual Input": + schema = ", ".join([str(i.name) + ": " + str(i.data_type) for i in node_input.raw_data_format.columns]) + tree += schema + elif operation == "With Columns": + tree_with_col_schema = ", " + node_input.function.field.name + ": " + node_input.function.field.data_type + tree += schema + tree_with_col_schema + elif operation == "Filter": + index = node_input.filter_input.advanced_filter.find("]") + filtered_column = str(node_input.filter_input.advanced_filter[1:index]) + schema = re.sub('({str(filtered_column)}: [A-Za-z0-9]+\,\s)', "", schema) + tree += schema + elif operation == "Group By": + for col in node_input.groupby_input.agg_cols: + schema = re.sub(str(col.old_name) + ': [a-z0-9]+\, ', "", schema) + tree += schema + tree += "]" + else: + if operation == "Manual Input": + tree += ": " + str(node_input.raw_data_format.data) + elif operation == "With Columns": + tree += ": " + str(node_input.function) + elif operation == "Filter": + tree += ": " + str(node_input.filter_input.advanced_filter) + elif operation == "Group By": + tree += ": groupby=[" + ", ".join([col.old_name for col in node_input.groupby_input.agg_cols if col.agg == "groupby"]) + "], " + tree += "agg=[" + ", ".join([str(col.agg) + "(" + str(col.old_name) + ")" for col in node_input.groupby_input.agg_cols if col.agg != "groupby"]) + "]" + + if node_input.node_id < max_node_id: + tree += "\n" + "# " + " "*3*(tabs-1) + "|___ " + print("\n"*2) return print(tree) From 3ead94ab2de22872f4338c8cb5e22d0b38d340e7 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 6 Aug 2025 17:38:54 +0100 Subject: [PATCH 03/43] Adding method print_tree() to FlowGraph --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 24bb8602..890c16de 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -269,7 +269,7 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def print_tree(self, show_schema=False, show_descriptions=False):: + def print_tree(self, show_schema=False, show_descriptions=False): """ Print flow_graph as a tree. """ From e1580750a38f0a54fa8317557b831d9996d45bbc Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Mon, 18 Aug 2025 11:46:19 +0100 Subject: [PATCH 04/43] Changes to node ordering in print_tree method --- .../flowfile_core/flowfile/flow_graph.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 890c16de..a93073a7 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -269,7 +269,7 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def print_tree(self, show_schema=False, show_descriptions=False): + def print_tree(self): """ Print flow_graph as a tree. """ @@ -278,8 +278,10 @@ def print_tree(self, show_schema=False, show_descriptions=False): tree = "" tabs = 0 - for node in self.nodes: - tab_counter += 1 + ordered_nodes = [i.node_id for i in self.execution_order] + + for node in ordered_nodes: + tabs += 1 node_input = node.setting_input operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() @@ -1247,21 +1249,21 @@ def run_graph(self): self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - skip_nodes = [node for node in self.nodes if not node.is_correct] - skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes]) - execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if - node not in skip_nodes], + self.skip_nodes = [node for node in self.nodes if not node.is_correct] + self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) + self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if + node not in self.skip_nodes], flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) - skip_node_message(self.flow_logger, skip_nodes) - execution_order_message(self.flow_logger, execution_order) + skip_node_message(self.flow_logger, self.skip_nodes) + execution_order_message(self.flow_logger, self.execution_order) performance_mode = self.flow_settings.execution_mode == 'Performance' - for node in execution_order: + for node in self.execution_order: node_logger = self.flow_logger.get_node_logger(node.node_id) if self.flow_settings.is_canceled: self.flow_logger.info('Flow canceled') break - if node in skip_nodes: + if node in self.skip_nodes: node_logger.info(f'Skipping node {node.node_id}') continue node_result = NodeResult(node_id=node.node_id, node_name=node.name) @@ -1289,7 +1291,7 @@ def run_graph(self): node_result.is_running = False node_logger.error(f'Error in node {node.node_id}: {e}') if not node_result.success: - skip_nodes.extend(list(node.get_all_dependent_nodes())) + self.skip_nodes.extend(list(node.get_all_dependent_nodes())) node_logger.info(f'Completed node with success: {node_result.success}') self.nodes_completed += 1 self.flow_logger.info('Flow completed!') From 667ee1d219656c6907d629476ce4461288f72092 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Mon, 18 Aug 2025 11:46:19 +0100 Subject: [PATCH 05/43] Changes to node ordering in print_tree method --- .../flowfile_core/flowfile/flow_graph.py | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 6a6113f1..43a8ced0 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -369,7 +369,7 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def print_tree(self, show_schema=False, show_descriptions=False): + def print_tree(self): """ Print flow_graph as a tree. """ @@ -378,8 +378,10 @@ def print_tree(self, show_schema=False, show_descriptions=False): tree = "" tabs = 0 - for node in self.nodes: - tab_counter += 1 + ordered_nodes = [i.node_id for i in self.execution_order] + + for node in ordered_nodes: + tabs += 1 node_input = node.setting_input operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() @@ -1635,24 +1637,21 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - skip_nodes = [node for node in self.nodes if not node.is_correct] - skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes]) - execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if - node not in skip_nodes], + self.skip_nodes = [node for node in self.nodes if not node.is_correct] + self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) + self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if + node not in self.skip_nodes], flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) - skip_node_message(self.flow_logger, skip_nodes) - execution_order_message(self.flow_logger, execution_order) + + skip_node_message(self.flow_logger, self.skip_nodes) + execution_order_message(self.flow_logger, self.execution_order) performance_mode = self.flow_settings.execution_mode == 'Performance' - if self.flow_settings.execution_location == 'local': - OFFLOAD_TO_WORKER.value = False - elif self.flow_settings.execution_location == 'remote': - OFFLOAD_TO_WORKER.value = True - for node in execution_order: + for node in self.execution_order: node_logger = self.flow_logger.get_node_logger(node.node_id) if self.flow_settings.is_canceled: self.flow_logger.info('Flow canceled') break - if node in skip_nodes: + if node in self.skip_nodes: node_logger.info(f'Skipping node {node.node_id}') continue node_result = NodeResult(node_id=node.node_id, node_name=node.name) @@ -1680,7 +1679,7 @@ def run_graph(self) -> RunInformation | None: node_result.is_running = False node_logger.error(f'Error in node {node.node_id}: {e}') if not node_result.success: - skip_nodes.extend(list(node.get_all_dependent_nodes())) + self.skip_nodes.extend(list(node.get_all_dependent_nodes())) node_logger.info(f'Completed node with success: {node_result.success}') self.nodes_completed += 1 self.flow_logger.info('Flow completed!') From f9702fa1c41b0e755fe9b6dceca8beea5c414cc7 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Mon, 18 Aug 2025 12:07:22 +0100 Subject: [PATCH 06/43] Changes to node ordering of print_tree --- .../flowfile_core/flowfile/flow_graph.py | 98 +------------------ 1 file changed, 3 insertions(+), 95 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 43a8ced0..1bf6ec28 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -243,64 +243,6 @@ def placeholder(n: FlowNode = None): self.add_node_step(node_id=node_promise.node_id, node_type=node_promise.node_type, function=placeholder, setting_input=node_promise) - def print_tree(self, show_schema=False, show_descriptions=False): - """ - Print flow_graph as a tree. - """ - max_node_id = max(self._node_db.keys()) - - tree = "" - tabs = 0 - tab_counter = 0 - for node in self.nodes: - tab_counter += 1 - node_input = node.setting_input - operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() - - if operation == "Formula": - operation = "With Columns" - - tree += str(operation) + " (id=" + str(node_input.node_id) + ")" - - if show_descriptions & show_schema: - raise ValueError('show_descriptions and show_schema cannot be True simultaneously') - if show_descriptions: - tree += ": " + str(node_input.description) - elif show_schema: - tree += " -> [" - if operation == "Manual Input": - schema = ", ".join([str(i.name) + ": " + str(i.data_type) for i in node_input.raw_data_format.columns]) - tree += schema - elif operation == "With Columns": - tree_with_col_schema = ", " + node_input.function.field.name + ": " + node_input.function.field.data_type - tree += schema + tree_with_col_schema - elif operation == "Filter": - index = node_input.filter_input.advanced_filter.find("]") - filtered_column = str(node_input.filter_input.advanced_filter[1:index]) - schema = re.sub('({str(filtered_column)}: [A-Za-z0-9]+\,\s)', "", schema) - tree += schema - elif operation == "Group By": - for col in node_input.groupby_input.agg_cols: - schema = re.sub(str(col.old_name) + ': [a-z0-9]+\, ', "", schema) - tree += schema - tree += "]" - else: - if operation == "Manual Input": - tree += ": " + str(node_input.raw_data_format.data) - elif operation == "With Columns": - tree += ": " + str(node_input.function) - elif operation == "Filter": - tree += ": " + str(node_input.filter_input.advanced_filter) - elif operation == "Group By": - tree += ": groupby=[" + ", ".join([col.old_name for col in node_input.groupby_input.agg_cols if col.agg == "groupby"]) + "], " - tree += "agg=[" + ", ".join([str(col.agg) + "(" + str(col.old_name) + ")" for col in node_input.groupby_input.agg_cols if col.agg != "groupby"]) + "]" - - if node_input.node_id < max_node_id: - tree += "\n" + "# " + " "*3*(tabs-1) + "|___ " - print("\n"*2) - - return print(tree) - def apply_layout(self, y_spacing: int = 150, x_spacing: int = 200, initial_y: int = 100): """Calculates and applies a layered layout to all nodes in the graph. @@ -383,47 +325,13 @@ def print_tree(self): for node in ordered_nodes: tabs += 1 node_input = node.setting_input - operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() - - if operation == "Formula": - operation = "With Columns" - + operation = str(pipeline.flow_graph._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() tree += str(operation) + " (id=" + str(node_input.node_id) + ")" - if show_descriptions & show_schema: - raise ValueError('show_descriptions and show_schema cannot be True simultaneously') - if show_descriptions: + if node_input.description: tree += ": " + str(node_input.description) - elif show_schema: - tree += " -> [" - if operation == "Manual Input": - schema = ", ".join([str(i.name) + ": " + str(i.data_type) for i in node_input.raw_data_format.columns]) - tree += schema - elif operation == "With Columns": - tree_with_col_schema = ", " + node_input.function.field.name + ": " + node_input.function.field.data_type - tree += schema + tree_with_col_schema - elif operation == "Filter": - index = node_input.filter_input.advanced_filter.find("]") - filtered_column = str(node_input.filter_input.advanced_filter[1:index]) - schema = re.sub('({str(filtered_column)}: [A-Za-z0-9]+\,\s)', "", schema) - tree += schema - elif operation == "Group By": - for col in node_input.groupby_input.agg_cols: - schema = re.sub(str(col.old_name) + ': [a-z0-9]+\, ', "", schema) - tree += schema - tree += "]" - else: - if operation == "Manual Input": - tree += ": " + str(node_input.raw_data_format.data) - elif operation == "With Columns": - tree += ": " + str(node_input.function) - elif operation == "Filter": - tree += ": " + str(node_input.filter_input.advanced_filter) - elif operation == "Group By": - tree += ": groupby=[" + ", ".join([col.old_name for col in node_input.groupby_input.agg_cols if col.agg == "groupby"]) + "], " - tree += "agg=[" + ", ".join([str(col.agg) + "(" + str(col.old_name) + ")" for col in node_input.groupby_input.agg_cols if col.agg != "groupby"]) + "]" - + if node_input.node_id < max_node_id: tree += "\n" + "# " + " "*3*(tabs-1) + "|___ " print("\n"*2) From d98ad7bdfac6909f9a0d3898f3e48438295a4a30 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Mon, 18 Aug 2025 12:18:21 +0100 Subject: [PATCH 07/43] Changes to node ordering of print_tree --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 1bf6ec28..489d3834 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -227,6 +227,11 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) + self.skip_nodes = [node for node in self.nodes if not node.is_correct] + self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) + self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if + node not in self.skip_nodes],flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. @@ -1545,11 +1550,6 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - self.skip_nodes = [node for node in self.nodes if not node.is_correct] - self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) - self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if - node not in self.skip_nodes], - flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) skip_node_message(self.flow_logger, self.skip_nodes) execution_order_message(self.flow_logger, self.execution_order) From 514384977b15fd1816c424f8dbbd4e84d1538f60 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Mon, 18 Aug 2025 16:38:18 +0100 Subject: [PATCH 08/43] retrigger checks From 83a72d1fd3caa45e1c247ad1f2df1915e96d4a60 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 10:50:57 +0100 Subject: [PATCH 09/43] Adding tests for print_tree method --- .../flowfile_core/flowfile/flow_graph.py | 2 +- .../tests/flowfile/test_code_generator.py | 80 +++++++++++++++++++ 2 files changed, 81 insertions(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index a7815e82..9b6690ce 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -330,7 +330,7 @@ def print_tree(self): for node in ordered_nodes: tabs += 1 node_input = node.setting_input - operation = str(pipeline.flow_graph._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() + operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() tree += str(operation) + " (id=" + str(node_input.node_id) + ")" diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 3177447e..802e8a85 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2379,6 +2379,86 @@ def test_aggregation_functions(): expected_df = flow.get_node(2).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) +def test_print_tree(): + """ Test made to the FlowGraph's print_tree method""" + flow = create_basic_flow() + + manual_input_data_1 = [ + [1, 2, 3], + ["North", "South", "East"], + [10, 5, 8], + [150, 300, 200] +] + +manual_input_data_2 = [ + [4, 5, 6], + ["West", "North", "South"], + [15, 12, 7], + [100, 200, 250] +] + +manual_input_data_3 = [ + [7, 8, 9], + ["East", "West", "North"], + [10, 6, 3], + [180, 220, 350] +] + +manual_input_data_4 = [ + [10, 11, 12], + ["South", "East", "West"], + [9, 4, 7], + [280, 260, 230] +] + + +for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): + data = input_schema.NodeManualInput( + flow_id=1, + node_id=j, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="id", data_type="Integer"), + input_schema.MinimalFieldInfo(name="region", data_type="String"), + input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), + input_schema.MinimalFieldInfo(name="price", data_type="Integer") + ], + data=i + ) + ) + graph.add_manual_input(data) + +# Add union node +union_node = input_schema.NodeUnion( + flow_id=1, + node_id=5, + depending_on_ids=[1, 2, 3, 4], + union_input=transform_schema.UnionInput(mode="relaxed") +) +graph.add_union(union_node) +for i in range(1, 5): + connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') + add_connection(graph, connection) + + + +# Add group by node +groupby_node = input_schema.NodeGroupBy( + flow_id=1, + node_id=6, + depending_on_id=1, + groupby_input=transform_schema.GroupByInput( + agg_cols=[ + transform_schema.AggColl("region", "groupby"), + transform_schema.AggColl("quantity", "sum", "total_quantity"), + transform_schema.AggColl("price", "mean", "avg_price"), + transform_schema.AggColl("quantity", "count", "num_transactions") + ] + ) +) +graph.add_group_by(groupby_node) +add_connection(graph, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From df0964472cb37c73429cbf942c62f7c4391d856a Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 10:55:34 +0100 Subject: [PATCH 10/43] Adding tests for print_tree method --- .../tests/flowfile/test_code_generator.py | 86 +++++++++---------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 802e8a85..84cd507e 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2412,52 +2412,52 @@ def test_print_tree(): ] -for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): - data = input_schema.NodeManualInput( - flow_id=1, - node_id=j, - raw_data_format=input_schema.RawData( - columns=[ - input_schema.MinimalFieldInfo(name="id", data_type="Integer"), - input_schema.MinimalFieldInfo(name="region", data_type="String"), - input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), - input_schema.MinimalFieldInfo(name="price", data_type="Integer") - ], - data=i + for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): + data = input_schema.NodeManualInput( + flow_id=1, + node_id=j, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="id", data_type="Integer"), + input_schema.MinimalFieldInfo(name="region", data_type="String"), + input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), + input_schema.MinimalFieldInfo(name="price", data_type="Integer") + ], + data=i + ) ) + graph.add_manual_input(data) + + # Add union node + union_node = input_schema.NodeUnion( + flow_id=1, + node_id=5, + depending_on_ids=[1, 2, 3, 4], + union_input=transform_schema.UnionInput(mode="relaxed") + ) + graph.add_union(union_node) + for i in range(1, 5): + connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') + add_connection(graph, connection) + + + + # Add group by node + groupby_node = input_schema.NodeGroupBy( + flow_id=1, + node_id=6, + depending_on_id=1, + groupby_input=transform_schema.GroupByInput( + agg_cols=[ + transform_schema.AggColl("region", "groupby"), + transform_schema.AggColl("quantity", "sum", "total_quantity"), + transform_schema.AggColl("price", "mean", "avg_price"), + transform_schema.AggColl("quantity", "count", "num_transactions") + ] ) - graph.add_manual_input(data) - -# Add union node -union_node = input_schema.NodeUnion( - flow_id=1, - node_id=5, - depending_on_ids=[1, 2, 3, 4], - union_input=transform_schema.UnionInput(mode="relaxed") -) -graph.add_union(union_node) -for i in range(1, 5): - connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') - add_connection(graph, connection) - - - -# Add group by node -groupby_node = input_schema.NodeGroupBy( - flow_id=1, - node_id=6, - depending_on_id=1, - groupby_input=transform_schema.GroupByInput( - agg_cols=[ - transform_schema.AggColl("region", "groupby"), - transform_schema.AggColl("quantity", "sum", "total_quantity"), - transform_schema.AggColl("price", "mean", "avg_price"), - transform_schema.AggColl("quantity", "count", "num_transactions") - ] ) -) -graph.add_group_by(groupby_node) -add_connection(graph, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + graph.add_group_by(groupby_node) + add_connection(graph, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) def test_flow_with_disconnected_nodes(): From 548729818f64dcf298fd37818b1547d38207667d Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 10:59:21 +0100 Subject: [PATCH 11/43] Adding tests for print_tree method --- .../tests/flowfile/test_code_generator.py | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 84cd507e..be990b40 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2413,20 +2413,20 @@ def test_print_tree(): for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): - data = input_schema.NodeManualInput( - flow_id=1, - node_id=j, - raw_data_format=input_schema.RawData( - columns=[ - input_schema.MinimalFieldInfo(name="id", data_type="Integer"), - input_schema.MinimalFieldInfo(name="region", data_type="String"), - input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), - input_schema.MinimalFieldInfo(name="price", data_type="Integer") - ], - data=i - ) + data = input_schema.NodeManualInput( + flow_id=1, + node_id=j, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="id", data_type="Integer"), + input_schema.MinimalFieldInfo(name="region", data_type="String"), + input_schema.MinimalFieldInfo(name="quantity", data_type="Integer"), + input_schema.MinimalFieldInfo(name="price", data_type="Integer") + ], + data=i ) - graph.add_manual_input(data) + ) + graph.add_manual_input(data) # Add union node union_node = input_schema.NodeUnion( From 90e237f7c11cbd5d938627acbba110c77c27b0aa Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 11:07:13 +0100 Subject: [PATCH 12/43] Adding tests for print_tree method --- .../tests/flowfile/test_code_generator.py | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index be990b40..c3583661 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2381,6 +2381,7 @@ def test_aggregation_functions(): def test_print_tree(): """ Test made to the FlowGraph's print_tree method""" + flow = create_basic_flow() manual_input_data_1 = [ @@ -2388,28 +2389,28 @@ def test_print_tree(): ["North", "South", "East"], [10, 5, 8], [150, 300, 200] -] + ] -manual_input_data_2 = [ - [4, 5, 6], - ["West", "North", "South"], - [15, 12, 7], - [100, 200, 250] -] + manual_input_data_2 = [ + [4, 5, 6], + ["West", "North", "South"], + [15, 12, 7], + [100, 200, 250] + ] -manual_input_data_3 = [ - [7, 8, 9], - ["East", "West", "North"], - [10, 6, 3], - [180, 220, 350] -] + manual_input_data_3 = [ + [7, 8, 9], + ["East", "West", "North"], + [10, 6, 3], + [180, 220, 350] + ] -manual_input_data_4 = [ - [10, 11, 12], - ["South", "East", "West"], - [9, 4, 7], - [280, 260, 230] -] + manual_input_data_4 = [ + [10, 11, 12], + ["South", "East", "West"], + [9, 4, 7], + [280, 260, 230] + ] for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): From f3a70d9706780998a05ad859ff8c5c86b8ba5adb Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 11:17:32 +0100 Subject: [PATCH 13/43] Adding tests for print_tree method --- .../tests/flowfile/test_code_generator.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index c3583661..6401bd9b 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2399,7 +2399,7 @@ def test_print_tree(): ] manual_input_data_3 = [ - [7, 8, 9], + [7, 8, 9], ["East", "West", "North"], [10, 6, 3], [180, 220, 350] @@ -2412,7 +2412,6 @@ def test_print_tree(): [280, 260, 230] ] - for i,j in zip([manual_input_data_1,manual_input_data_2,manual_input_data_3,manual_input_data_4], range(1,5)): data = input_schema.NodeManualInput( flow_id=1, @@ -2427,7 +2426,7 @@ def test_print_tree(): data=i ) ) - graph.add_manual_input(data) + flow.add_manual_input(data) # Add union node union_node = input_schema.NodeUnion( @@ -2436,12 +2435,10 @@ def test_print_tree(): depending_on_ids=[1, 2, 3, 4], union_input=transform_schema.UnionInput(mode="relaxed") ) - graph.add_union(union_node) + flow.add_union(union_node) for i in range(1, 5): connection = input_schema.NodeConnection.create_from_simple_input(i, 5, 'main') - add_connection(graph, connection) - - + add_connection(flow, connection) # Add group by node groupby_node = input_schema.NodeGroupBy( @@ -2457,8 +2454,8 @@ def test_print_tree(): ] ) ) - graph.add_group_by(groupby_node) - add_connection(graph, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + flow.add_group_by(groupby_node) + add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) def test_flow_with_disconnected_nodes(): From 128e14c1a180ecc51e5f76c7532d522ac45dbeec Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 12:31:51 +0100 Subject: [PATCH 14/43] Adding tests for print_tree method --- flowfile_core/tests/flowfile/test_code_generator.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 6401bd9b..59707bbd 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2457,6 +2457,15 @@ def test_print_tree(): flow.add_group_by(groupby_node) add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + verify_code_contains(code, + "df_1 = pl.LazyFrame(", + "df_2 = pl.LazyFrame(", + "df_3 = pl.LazyFrame(", + "df_4 = pl.LazyFrame(", + "df_5 = df_1.union(", + "df_6 = df_5.group_by(", + ) + def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From fc6c7871376c9b5ad18456a43553fc8ead565606 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 12:32:54 +0100 Subject: [PATCH 15/43] Adding tests for print_tree method --- flowfile_core/tests/flowfile/test_code_generator.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 59707bbd..b3676937 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2457,6 +2457,11 @@ def test_print_tree(): flow.add_group_by(groupby_node) add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) + code = export_flow_to_polars(flow) + print("\n"*2) + print(code) + print("\n"*2) + verify_code_contains(code, "df_1 = pl.LazyFrame(", "df_2 = pl.LazyFrame(", From c8b2a4556675baf9695fdc4d985a5feb7711c0e5 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 12:46:27 +0100 Subject: [PATCH 16/43] Adding tests for print_tree method --- flowfile_core/tests/flowfile/test_code_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index b3676937..1a85ce26 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2467,7 +2467,7 @@ def test_print_tree(): "df_2 = pl.LazyFrame(", "df_3 = pl.LazyFrame(", "df_4 = pl.LazyFrame(", - "df_5 = df_1.union(", + "df_5 = pl.concat(", "df_6 = df_5.group_by(", ) From efcedca3927dfb3cc44aca4ae94cf5e5dc3255bc Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 12:48:19 +0100 Subject: [PATCH 17/43] Adding tests for print_tree method --- flowfile_core/tests/flowfile/test_code_generator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 1a85ce26..bc7b6a08 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2458,9 +2458,6 @@ def test_print_tree(): add_connection(flow, node_connection=input_schema.NodeConnection.create_from_simple_input(5, 6, 'main')) code = export_flow_to_polars(flow) - print("\n"*2) - print(code) - print("\n"*2) verify_code_contains(code, "df_1 = pl.LazyFrame(", @@ -2470,7 +2467,10 @@ def test_print_tree(): "df_5 = pl.concat(", "df_6 = df_5.group_by(", ) - + verify_if_execute(code) + result_df = get_result_from_generated_code(code) + expected_df = flow.get_node(6).get_resulting_data().data_frame + assert_frame_equal(result_df, expected_df, check_row_order=False) def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From 25b3aa04b5623217b36842adc98ede74ce3b64bf Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:17:25 +0100 Subject: [PATCH 18/43] Merging skip_nodes and determine_execution_order --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 13 +++---------- .../flowfile/util/execution_orderer.py | 9 +++++++++ 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 9b6690ce..db43a5be 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -33,7 +33,7 @@ from flowfile_core.flowfile.utils import snake_case_to_camel_case from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode -from flowfile_core.flowfile.util.execution_orderer import determine_execution_order +from flowfile_core.flowfile.util.execution_orderer import compute_execution_order from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -227,10 +227,7 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) - self.skip_nodes = [node for node in self.nodes if not node.is_correct] - self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) - self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if - node not in self.skip_nodes],flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + self.skip_nodes, self.execution_order = compute_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. @@ -1550,11 +1547,7 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - self.skip_nodes = [node for node in self.nodes if not node.is_correct] - self.skip_nodes.extend([lead_to_node for node in self.skip_nodes for lead_to_node in node.leads_to_nodes]) - self.execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if - node not in self.skip_nodes], - flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + self.skip_nodes, self.execution_order = compute_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) skip_node_message(self.flow_logger, self.skip_nodes) execution_order_message(self.flow_logger, self.execution_order) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 1b20ea49..561d9147 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,6 +2,15 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict +from node_skipper import determine_nodes_to_skip + + +def compute_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): + skip_nodes = determine_nodes_to_skip(nodes=nodes) + computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], + flow_starts=flow_starts) + return skip_nodes, computed_execution_order + def determine_execution_order(all_nodes: List[FlowNode], flow_starts: List[FlowNode] = None) -> List[FlowNode]: From f913b4f4198df1f2d18e316ed0bcc1346dcd6741 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:20:04 +0100 Subject: [PATCH 19/43] Merging skip_nodes and determine_execution_order --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 6 +++--- .../flowfile_core/flowfile/util/execution_orderer.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index db43a5be..d7ccf8e6 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -33,7 +33,7 @@ from flowfile_core.flowfile.utils import snake_case_to_camel_case from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode -from flowfile_core.flowfile.util.execution_orderer import compute_execution_order +from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -227,7 +227,7 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) - self.skip_nodes, self.execution_order = compute_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + self.skip_nodes, self.execution_order = compute_skip_nodes_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. @@ -1547,7 +1547,7 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - self.skip_nodes, self.execution_order = compute_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + self.skip_nodes, self.execution_order = compute_skip_nodes_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) skip_node_message(self.flow_logger, self.skip_nodes) execution_order_message(self.flow_logger, self.execution_order) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 561d9147..344e3f19 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -5,7 +5,7 @@ from node_skipper import determine_nodes_to_skip -def compute_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): +def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): skip_nodes = determine_nodes_to_skip(nodes=nodes) computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], flow_starts=flow_starts) From fcfb487ac3a35c0f4c6f01a1de12f98cdf4a9704 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:23:54 +0100 Subject: [PATCH 20/43] Merging skip_nodes and determine_execution_order --- flowfile_core/flowfile_core/flowfile/util/execution_orderer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 344e3f19..e80d8fc2 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,7 +2,7 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict -from node_skipper import determine_nodes_to_skip +from .node_skipper import determine_nodes_to_skip def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): From ba95c9ef75b7cbf1f648cf19b958a89e58d50968 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:31:38 +0100 Subject: [PATCH 21/43] Merging skip_nodes and determine_execution_order --- .../flowfile_core/flowfile/util/execution_orderer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index e80d8fc2..75b21243 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,11 +2,11 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict -from .node_skipper import determine_nodes_to_skip +import node_skipper def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): - skip_nodes = determine_nodes_to_skip(nodes=nodes) + skip_nodes = node_skipper.determine_nodes_to_skip(nodes=nodes) computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], flow_starts=flow_starts) return skip_nodes, computed_execution_order From 503fdd26ca757d98e8977c99c0a676eca6df6618 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:39:05 +0100 Subject: [PATCH 22/43] Merging skip_nodes and determine_execution_order --- .../flowfile_core/flowfile/util/execution_orderer.py | 5 ++--- flowfile_core/flowfile_core/flowfile/util/node_skipper.py | 7 +++++++ 2 files changed, 9 insertions(+), 3 deletions(-) create mode 100644 flowfile_core/flowfile_core/flowfile/util/node_skipper.py diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 75b21243..02b75bb2 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,11 +2,10 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict -import node_skipper - +from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): - skip_nodes = node_skipper.determine_nodes_to_skip(nodes=nodes) + skip_nodes = determine_nodes_to_skip(nodes=nodes) computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], flow_starts=flow_starts) return skip_nodes, computed_execution_order diff --git a/flowfile_core/flowfile_core/flowfile/util/node_skipper.py b/flowfile_core/flowfile_core/flowfile/util/node_skipper.py new file mode 100644 index 00000000..37fbeac1 --- /dev/null +++ b/flowfile_core/flowfile_core/flowfile/util/node_skipper.py @@ -0,0 +1,7 @@ +from typing import List +from flowfile_core.flowfile.flow_node.flow_node import FlowNode + +def determine_nodes_to_skip(nodes : List[FlowNode]) -> List[FlowNode]: + skip_nodes = [node for node in nodes if not node.is_correct] + skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes]) + return skip_nodes \ No newline at end of file From 6ec7f0fac68419cc328e239080cb676aa03cd9c4 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:40:04 +0100 Subject: [PATCH 23/43] Merging skip_nodes and determine_execution_order --- flowfile_core/flowfile_core/flowfile/util/execution_orderer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 02b75bb2..85537a53 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,7 +2,7 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict -from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip +from node_skipper import determine_nodes_to_skip def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): skip_nodes = determine_nodes_to_skip(nodes=nodes) From dcf799b9a4250a84c399627abf7f71ab87f43506 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Wed, 20 Aug 2025 13:44:39 +0100 Subject: [PATCH 24/43] Merging skip_nodes and determine_execution_order --- flowfile_core/flowfile_core/flowfile/util/execution_orderer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 85537a53..02b75bb2 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -2,7 +2,7 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.configs import logger from collections import deque, defaultdict -from node_skipper import determine_nodes_to_skip +from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): skip_nodes = determine_nodes_to_skip(nodes=nodes) From 06b044bf15662289c5bf14f1b24a49390463dabe Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:05:44 +0100 Subject: [PATCH 25/43] Adding new graph_tree method with tests --- .../flowfile_core/flowfile/flow_graph.py | 24 +- .../flowfile_core/flowfile/util/graph_tree.py | 258 ++++++++++++++++++ .../tests/flowfile/test_code_generator.py | 5 +- 3 files changed, 264 insertions(+), 23 deletions(-) create mode 100644 flowfile_core/flowfile_core/flowfile/util/graph_tree.py diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index d7ccf8e6..ccc32eed 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -34,6 +34,7 @@ from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order +from flowfile_core.flowfile.util.graph_tree import graph_tree from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -317,28 +318,7 @@ def print_tree(self): """ Print flow_graph as a tree. """ - max_node_id = max(self._node_db.keys()) - - tree = "" - tabs = 0 - - ordered_nodes = [i.node_id for i in self.execution_order] - - for node in ordered_nodes: - tabs += 1 - node_input = node.setting_input - operation = str(self._node_db[node_input.node_id]).split("(")[1][:-1].replace("_", " ").title() - - tree += str(operation) + " (id=" + str(node_input.node_id) + ")" - - if node_input.description: - tree += ": " + str(node_input.description) - - if node_input.node_id < max_node_id: - tree += "\n" + "# " + " "*3*(tabs-1) + "|___ " - print("\n"*2) - - return print(tree) + return print(graph_tree(self)) def get_nodes_overview(self): """Gets a list of dictionary representations for all nodes in the graph.""" diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py new file mode 100644 index 00000000..8cb0302c --- /dev/null +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -0,0 +1,258 @@ +from flowfile_core.flowfile.flow_graph import FlowGraph +from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order + +def graph_tree(graph:FlowGraph): + """ + Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art. + """ + if not graph._node_db: + print("Empty flow graph") + return + + # Build node information + node_info = {} + for node in graph.nodes: + node_id = node.node_id + + # Get node label + operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" + label = f"{operation} (id={node_id})" + if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): + if node.setting_input.description: + desc = node.setting_input.description + if len(desc) > 20: # Truncate long descriptions + desc = desc[:17] + "..." + label = f"{operation} ({node_id}): {desc}" + + # Get inputs and outputs + inputs = { + 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], + 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, + 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None + } + outputs = [n.node_id for n in node.leads_to_nodes] + + node_info[node_id] = { + 'label': label, + 'short_label': f"{operation} ({node_id})", + 'inputs': inputs, + 'outputs': outputs, + 'depth': 0 + } + + + # Calculate depths for all nodes + for node_id in node_info: + calculate_depth(node_id, node_info) + + # Group nodes by depth + depth_groups = {} + max_depth = 0 + for node_id, info in node_info.items(): + depth = info['depth'] + max_depth = max(max_depth, depth) + if depth not in depth_groups: + depth_groups[depth] = [] + depth_groups[depth].append(node_id) + + # Sort nodes within each depth group + for depth in depth_groups: + depth_groups[depth].sort() + + # Create the main flow visualization + lines = [] + lines.append("=" * 80) + lines.append("Flow Graph Visualization") + lines.append("=" * 80) + lines.append("") + + # Track which nodes connect to what + merge_points = {} # target_id -> list of source_ids + for node_id, info in node_info.items(): + for output_id in info['outputs']: + if output_id not in merge_points: + merge_points[output_id] = [] + merge_points[output_id].append(node_id) + + # Build the flow paths + paths = [] # List of paths through the graph + visited_in_paths = set() + + # Find all root nodes (no inputs) + root_nodes = [nid for nid, info in node_info.items() + if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] + + if not root_nodes and graph._flow_starts: + root_nodes = [n.node_id for n in graph._flow_starts] + + # Get all paths + for root_id in root_nodes: + paths.extend(trace_path(root_id, node_info, merge_points)) + + # Find the maximum label length for each depth level + max_label_length = {} + for depth in range(max_depth + 1): + if depth in depth_groups: + max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth]) + max_label_length[depth] = max_len + + # Draw the paths + drawn_nodes = set() + merge_drawn = set() + + # Group paths by their merge points + paths_by_merge = {} + standalone_paths = [] + + for path in paths: + if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: + merge_id = path[-1] + if merge_id not in paths_by_merge: + paths_by_merge[merge_id] = [] + paths_by_merge[merge_id].append(path) + else: + standalone_paths.append(path) + + # Draw merged paths + for merge_id, merge_paths in paths_by_merge.items(): + if merge_id in merge_drawn: + continue + + merge_info = node_info[merge_id] + sources = merge_points[merge_id] + + # Draw each source path leading to the merge + for i, source_id in enumerate(sources): + # Find the path containing this source + source_path = None + for path in merge_paths: + if source_id in path: + source_path = path[:path.index(source_id) + 1] + break + + if source_path: + # Build the line for this path + line_parts = [] + for j, nid in enumerate(source_path): + if j == 0: + line_parts.append(node_info[nid]['label']) + else: + line_parts.append(f" ──> {node_info[nid]['short_label']}") + + # Add the merge arrow + if i == 0: + # First source + line = "".join(line_parts) + " ─────┐" + lines.append(line) + elif i == len(sources) - 1: + # Last source + line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] + lines.append(line) + + # Continue with the rest of the path after merge + remaining = node_info[merge_id]['outputs'] + while remaining: + next_id = remaining[0] + lines[-1] += f" ──> {node_info[next_id]['label']}" + remaining = node_info[next_id]['outputs'] + drawn_nodes.add(next_id) + else: + # Middle sources + line = "".join(line_parts) + " ─────┤" + lines.append(line) + + for nid in source_path: + drawn_nodes.add(nid) + + drawn_nodes.add(merge_id) + merge_drawn.add(merge_id) + lines.append("") # Add spacing between merge groups + + # Draw standalone paths + for path in standalone_paths: + if all(nid in drawn_nodes for nid in path): + continue + + line_parts = [] + for i, node_id in enumerate(path): + if node_id not in drawn_nodes: + if i == 0: + line_parts.append(node_info[node_id]['label']) + else: + line_parts.append(f" ──> {node_info[node_id]['short_label']}") + drawn_nodes.add(node_id) + + if line_parts: + lines.append("".join(line_parts)) + + # Add any remaining undrawn nodes + for node_id in node_info: + if node_id not in drawn_nodes: + lines.append(node_info[node_id]['label'] + " (isolated)") + + lines.append("") + lines.append("=" * 80) + lines.append("Execution Order") + lines.append("=" * 80) + + try: + skip_nodes, ordered_nodes = compute_skip_nodes_execution_order(nodes=graph.nodes,flow_starts=graph._flow_starts+graph.get_implicit_starter_nodes()) + if ordered_nodes: + for i, node in enumerate(ordered_nodes, 1): + lines.append(f" {i:3d}. {node_info[node.node_id]['label']}") + except Exception as e: + lines.append(f" Could not determine execution order: {e}") + + # Print everything + output = "\n".join(lines) + print(output) + + return output + + + +# Calculate depth for each node +def calculate_depth(node_id, node_info, visited=None): + if visited is None: + visited = set() + if node_id in visited: + return node_info[node_id]['depth'] + visited.add(node_id) + + max_input_depth = -1 + inputs = node_info[node_id]['inputs'] + + for main_id in inputs['main']: + max_input_depth = max(max_input_depth, calculate_depth(main_id, node_info, visited)) + if inputs['left']: + max_input_depth = max(max_input_depth, calculate_depth(inputs['left'], node_info, visited)) + if inputs['right']: + max_input_depth = max(max_input_depth, calculate_depth(inputs['right'], node_info, visited)) + + node_info[node_id]['depth'] = max_input_depth + 1 + return node_info[node_id]['depth'] + + +# Trace paths from each root +def trace_path(node_id, node_info, merge_points, current_path=None): + if current_path is None: + current_path = [] + + current_path = current_path + [node_id] + outputs = node_info[node_id]['outputs'] + + if not outputs: + # End of path + return [current_path] + + # If this node has multiple outputs or connects to a merge point, branch + all_paths = [] + for output_id in outputs: + if output_id in merge_points and len(merge_points[output_id]) > 1: + # This is a merge point, end this path here + all_paths.append(current_path + [output_id]) + else: + # Continue the path + all_paths.extend(trace_path(output_id, node_info, merge_points, current_path)) + + return all_paths \ No newline at end of file diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index bc7b6a08..f4b58672 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2379,7 +2379,7 @@ def test_aggregation_functions(): expected_df = flow.get_node(2).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) -def test_print_tree(): +def test_graph_tree(): """ Test made to the FlowGraph's print_tree method""" flow = create_basic_flow() @@ -2472,6 +2472,9 @@ def test_print_tree(): expected_df = flow.get_node(6).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) + tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] + assert any(element for element in tree_elements) in print(flow.print_tree()) + def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" flow = create_basic_flow() From da5984c3b5f8b43ce29e841ae30b930a9ee905a5 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:07:58 +0100 Subject: [PATCH 26/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index ccc32eed..01d09a2f 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -228,7 +228,7 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) - self.skip_nodes, self.execution_order = compute_skip_nodes_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + skip_nodes, execution_order = compute_skip_nodes_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. @@ -1527,17 +1527,17 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - self.skip_nodes, self.execution_order = compute_skip_nodes_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + skip_nodes, execution_order = compute_skip_nodes_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) - skip_node_message(self.flow_logger, self.skip_nodes) - execution_order_message(self.flow_logger, self.execution_order) + skip_node_message(self.flow_logger, skip_nodes) + execution_order_message(self.flow_logger, execution_order) performance_mode = self.flow_settings.execution_mode == 'Performance' for node in self.execution_order: node_logger = self.flow_logger.get_node_logger(node.node_id) if self.flow_settings.is_canceled: self.flow_logger.info('Flow canceled') break - if node in self.skip_nodes: + if node in skip_nodes: node_logger.info(f'Skipping node {node.node_id}') continue node_result = NodeResult(node_id=node.node_id, node_name=node.name) From c3d1688281ce11a98cc984b467ad8babf54b90f2 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:13:52 +0100 Subject: [PATCH 27/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/util/graph_tree.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py index 8cb0302c..3f5a4572 100644 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -1,7 +1,7 @@ -from flowfile_core.flowfile.flow_graph import FlowGraph +import flowfile as ff from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order -def graph_tree(graph:FlowGraph): +def graph_tree(graph:ff.FlowGraph): """ Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art. """ From 613482003c11e0cd8db26a9dc6e10923f949f859 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:17:13 +0100 Subject: [PATCH 28/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/util/execution_orderer.py | 1 + flowfile_core/flowfile_core/flowfile/util/node_skipper.py | 1 + 2 files changed, 2 insertions(+) diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 02b75bb2..2891e739 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -5,6 +5,7 @@ from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): + """ Computes the execution order after finding the nodes to skip on the execution step.""" skip_nodes = determine_nodes_to_skip(nodes=nodes) computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], flow_starts=flow_starts) diff --git a/flowfile_core/flowfile_core/flowfile/util/node_skipper.py b/flowfile_core/flowfile_core/flowfile/util/node_skipper.py index 37fbeac1..f3b27110 100644 --- a/flowfile_core/flowfile_core/flowfile/util/node_skipper.py +++ b/flowfile_core/flowfile_core/flowfile/util/node_skipper.py @@ -2,6 +2,7 @@ from flowfile_core.flowfile.flow_node.flow_node import FlowNode def determine_nodes_to_skip(nodes : List[FlowNode]) -> List[FlowNode]: + """ Finds nodes to skip on the execution step. """ skip_nodes = [node for node in nodes if not node.is_correct] skip_nodes.extend([lead_to_node for node in skip_nodes for lead_to_node in node.leads_to_nodes]) return skip_nodes \ No newline at end of file From 8f8ff5aba126182ff04fcf7575ac22078df9878a Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:23:18 +0100 Subject: [PATCH 29/43] Adding new graph_tree method with tests --- flowfile_core/tests/flowfile/test_code_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index f4b58672..4ce4ab52 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2473,7 +2473,7 @@ def test_graph_tree(): assert_frame_equal(result_df, expected_df, check_row_order=False) tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] - assert any(element for element in tree_elements) in print(flow.print_tree()) + assert any(element for element in tree_elements) in flow.print_tree() def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From 3c9111c31e11ebb7bf246e2309953ee70f422346 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:28:49 +0100 Subject: [PATCH 30/43] Adding new graph_tree method with tests --- flowfile/flowfile/api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/flowfile/flowfile/api.py b/flowfile/flowfile/api.py index cfbd2021..33a3ff54 100644 --- a/flowfile/flowfile/api.py +++ b/flowfile/flowfile/api.py @@ -17,6 +17,7 @@ from subprocess import Popen from flowfile_core.flowfile.flow_graph import FlowGraph from tempfile import TemporaryDirectory +import flowfile as ff # Configuration FLOWFILE_HOST: str = os.environ.get("FLOWFILE_HOST", "127.0.0.1") From 88b10481fdece8a0e67c0207d73dbc5ae435825e Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:31:07 +0100 Subject: [PATCH 31/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 4 ++-- .../flowfile_core/flowfile/util/execution_orderer.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 01d09a2f..6fea0c28 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -33,7 +33,7 @@ from flowfile_core.flowfile.utils import snake_case_to_camel_case from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode -from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order +from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan from flowfile_core.flowfile.util.graph_tree import graph_tree from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, @@ -228,7 +228,7 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) - skip_nodes, execution_order = compute_skip_nodes_execution_order(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. diff --git a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py index 2891e739..dde3ec5a 100644 --- a/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py +++ b/flowfile_core/flowfile_core/flowfile/util/execution_orderer.py @@ -4,7 +4,7 @@ from collections import deque, defaultdict from flowfile_core.flowfile.util.node_skipper import determine_nodes_to_skip -def compute_skip_nodes_execution_order(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): +def compute_execution_plan(nodes: List[FlowNode], flow_starts: List[FlowNode] = None): """ Computes the execution order after finding the nodes to skip on the execution step.""" skip_nodes = determine_nodes_to_skip(nodes=nodes) computed_execution_order = determine_execution_order(all_nodes=[node for node in nodes if node not in skip_nodes], From 3e1cace7c5e034c0aadfc041132e1af63d242b54 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:36:48 +0100 Subject: [PATCH 32/43] Adding new graph_tree method with tests --- flowfile/flowfile/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flowfile/flowfile/__init__.py b/flowfile/flowfile/__init__.py index de7db989..7d1bf37b 100644 --- a/flowfile/flowfile/__init__.py +++ b/flowfile/flowfile/__init__.py @@ -21,7 +21,6 @@ FlowFrame ) from flowfile_core.schemas.cloud_storage_schemas import FullCloudStorageConnection -from flowfile_core.flowfile.flow_graph import FlowGraph from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn From 0a7faa8f4b7065b1eaa850901f79c5cadc7c5e06 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:38:21 +0100 Subject: [PATCH 33/43] Adding new graph_tree method with tests --- flowfile/flowfile/__init__.py | 1 + flowfile_core/flowfile_core/flowfile/util/graph_tree.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flowfile/flowfile/__init__.py b/flowfile/flowfile/__init__.py index 7d1bf37b..de7db989 100644 --- a/flowfile/flowfile/__init__.py +++ b/flowfile/flowfile/__init__.py @@ -21,6 +21,7 @@ FlowFrame ) from flowfile_core.schemas.cloud_storage_schemas import FullCloudStorageConnection +from flowfile_core.flowfile.flow_graph import FlowGraph from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py index 3f5a4572..984886aa 100644 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -1,7 +1,7 @@ -import flowfile as ff from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order +from flowfile_core.flowfile.flow_graph import FlowGraph -def graph_tree(graph:ff.FlowGraph): +def graph_tree(graph:FlowGraph): """ Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art. """ From 2eef53785c5ae79b4282a188eb35fd148e6cc11f Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:41:03 +0100 Subject: [PATCH 34/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 2 +- flowfile_core/flowfile_core/flowfile/util/graph_tree.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 6fea0c28..39aefe59 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -1527,7 +1527,7 @@ def run_graph(self) -> RunInformation | None: self.end_datetime = None self.latest_run_info = None self.flow_logger.info('Starting to run flowfile flow...') - skip_nodes, execution_order = compute_skip_nodes_execution_order(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes, flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) skip_node_message(self.flow_logger, skip_nodes) execution_order_message(self.flow_logger, execution_order) diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py index 984886aa..76b7184e 100644 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -1,4 +1,4 @@ -from flowfile_core.flowfile.util.execution_orderer import compute_skip_nodes_execution_order +from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan from flowfile_core.flowfile.flow_graph import FlowGraph def graph_tree(graph:FlowGraph): @@ -196,7 +196,7 @@ def graph_tree(graph:FlowGraph): lines.append("=" * 80) try: - skip_nodes, ordered_nodes = compute_skip_nodes_execution_order(nodes=graph.nodes,flow_starts=graph._flow_starts+graph.get_implicit_starter_nodes()) + skip_nodes, ordered_nodes = compute_execution_plan(nodes=graph.nodes,flow_starts=graph._flow_starts+graph.get_implicit_starter_nodes()) if ordered_nodes: for i, node in enumerate(ordered_nodes, 1): lines.append(f" {i:3d}. {node_info[node.node_id]['label']}") From e7e37e1f0aa44bf3a776b849987e76dae4b1cc13 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:51:47 +0100 Subject: [PATCH 35/43] Adding new graph_tree method with tests --- .../flowfile_core/flowfile/flow_graph.py | 212 +++++++++++++++++- .../flowfile_core/flowfile/util/graph_tree.py | 212 ------------------ 2 files changed, 206 insertions(+), 218 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 39aefe59..55a74871 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -34,7 +34,7 @@ from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan -from flowfile_core.flowfile.util.graph_tree import graph_tree +from flowfile_core.flowfile.util.graph_tree import calculate_depth, trace_path from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -314,11 +314,211 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def print_tree(self): - """ - Print flow_graph as a tree. - """ - return print(graph_tree(self)) + def graph_tree(self): + """Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art.""" + if not self._node_db: + print("Empty flow graph") + return + + # Build node information + node_info = {} + for node in self.nodes: + node_id = node.node_id + + # Get node label + operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" + label = f"{operation} (id={node_id})" + if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): + if node.setting_input.description: + desc = node.setting_input.description + if len(desc) > 20: # Truncate long descriptions + desc = desc[:17] + "..." + label = f"{operation} ({node_id}): {desc}" + + # Get inputs and outputs + inputs = { + 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], + 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, + 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None + } + outputs = [n.node_id for n in node.leads_to_nodes] + + node_info[node_id] = { + 'label': label, + 'short_label': f"{operation} ({node_id})", + 'inputs': inputs, + 'outputs': outputs, + 'depth': 0 + } + + + # Calculate depths for all nodes + for node_id in node_info: + calculate_depth(node_id, node_info) + + # Group nodes by depth + depth_groups = {} + max_depth = 0 + for node_id, info in node_info.items(): + depth = info['depth'] + max_depth = max(max_depth, depth) + if depth not in depth_groups: + depth_groups[depth] = [] + depth_groups[depth].append(node_id) + + # Sort nodes within each depth group + for depth in depth_groups: + depth_groups[depth].sort() + + # Create the main flow visualization + lines = [] + lines.append("=" * 80) + lines.append("Flow Graph Visualization") + lines.append("=" * 80) + lines.append("") + + # Track which nodes connect to what + merge_points = {} # target_id -> list of source_ids + for node_id, info in node_info.items(): + for output_id in info['outputs']: + if output_id not in merge_points: + merge_points[output_id] = [] + merge_points[output_id].append(node_id) + + # Build the flow paths + paths = [] # List of paths through the graph + visited_in_paths = set() + + # Find all root nodes (no inputs) + root_nodes = [nid for nid, info in node_info.items() + if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] + + if not root_nodes and self._flow_starts: + root_nodes = [n.node_id for n in self._flow_starts] + + # Get all paths + for root_id in root_nodes: + paths.extend(trace_path(root_id, node_info, merge_points)) + + # Find the maximum label length for each depth level + max_label_length = {} + for depth in range(max_depth + 1): + if depth in depth_groups: + max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth]) + max_label_length[depth] = max_len + + # Draw the paths + drawn_nodes = set() + merge_drawn = set() + + # Group paths by their merge points + paths_by_merge = {} + standalone_paths = [] + + for path in paths: + if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: + merge_id = path[-1] + if merge_id not in paths_by_merge: + paths_by_merge[merge_id] = [] + paths_by_merge[merge_id].append(path) + else: + standalone_paths.append(path) + + # Draw merged paths + for merge_id, merge_paths in paths_by_merge.items(): + if merge_id in merge_drawn: + continue + + merge_info = node_info[merge_id] + sources = merge_points[merge_id] + + # Draw each source path leading to the merge + for i, source_id in enumerate(sources): + # Find the path containing this source + source_path = None + for path in merge_paths: + if source_id in path: + source_path = path[:path.index(source_id) + 1] + break + + if source_path: + # Build the line for this path + line_parts = [] + for j, nid in enumerate(source_path): + if j == 0: + line_parts.append(node_info[nid]['label']) + else: + line_parts.append(f" ──> {node_info[nid]['short_label']}") + + # Add the merge arrow + if i == 0: + # First source + line = "".join(line_parts) + " ─────┐" + lines.append(line) + elif i == len(sources) - 1: + # Last source + line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] + lines.append(line) + + # Continue with the rest of the path after merge + remaining = node_info[merge_id]['outputs'] + while remaining: + next_id = remaining[0] + lines[-1] += f" ──> {node_info[next_id]['label']}" + remaining = node_info[next_id]['outputs'] + drawn_nodes.add(next_id) + else: + # Middle sources + line = "".join(line_parts) + " ─────┤" + lines.append(line) + + for nid in source_path: + drawn_nodes.add(nid) + + drawn_nodes.add(merge_id) + merge_drawn.add(merge_id) + lines.append("") # Add spacing between merge groups + + # Draw standalone paths + for path in standalone_paths: + if all(nid in drawn_nodes for nid in path): + continue + + line_parts = [] + for i, node_id in enumerate(path): + if node_id not in drawn_nodes: + if i == 0: + line_parts.append(node_info[node_id]['label']) + else: + line_parts.append(f" ──> {node_info[node_id]['short_label']}") + drawn_nodes.add(node_id) + + if line_parts: + lines.append("".join(line_parts)) + + # Add any remaining undrawn nodes + for node_id in node_info: + if node_id not in drawn_nodes: + lines.append(node_info[node_id]['label'] + " (isolated)") + + lines.append("") + lines.append("=" * 80) + lines.append("Execution Order") + lines.append("=" * 80) + + try: + skip_nodes, ordered_nodes = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + if ordered_nodes: + for i, node in enumerate(ordered_nodes, 1): + lines.append(f" {i:3d}. {node_info[node.node_id]['label']}") + except Exception as e: + lines.append(f" Could not determine execution order: {e}") + + # Print everything + output = "\n".join(lines) + print(output) + + return output def get_nodes_overview(self): """Gets a list of dictionary representations for all nodes in the graph.""" diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py index 76b7184e..146cba81 100644 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -1,215 +1,3 @@ -from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan -from flowfile_core.flowfile.flow_graph import FlowGraph - -def graph_tree(graph:FlowGraph): - """ - Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art. - """ - if not graph._node_db: - print("Empty flow graph") - return - - # Build node information - node_info = {} - for node in graph.nodes: - node_id = node.node_id - - # Get node label - operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" - label = f"{operation} (id={node_id})" - if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): - if node.setting_input.description: - desc = node.setting_input.description - if len(desc) > 20: # Truncate long descriptions - desc = desc[:17] + "..." - label = f"{operation} ({node_id}): {desc}" - - # Get inputs and outputs - inputs = { - 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], - 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, - 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None - } - outputs = [n.node_id for n in node.leads_to_nodes] - - node_info[node_id] = { - 'label': label, - 'short_label': f"{operation} ({node_id})", - 'inputs': inputs, - 'outputs': outputs, - 'depth': 0 - } - - - # Calculate depths for all nodes - for node_id in node_info: - calculate_depth(node_id, node_info) - - # Group nodes by depth - depth_groups = {} - max_depth = 0 - for node_id, info in node_info.items(): - depth = info['depth'] - max_depth = max(max_depth, depth) - if depth not in depth_groups: - depth_groups[depth] = [] - depth_groups[depth].append(node_id) - - # Sort nodes within each depth group - for depth in depth_groups: - depth_groups[depth].sort() - - # Create the main flow visualization - lines = [] - lines.append("=" * 80) - lines.append("Flow Graph Visualization") - lines.append("=" * 80) - lines.append("") - - # Track which nodes connect to what - merge_points = {} # target_id -> list of source_ids - for node_id, info in node_info.items(): - for output_id in info['outputs']: - if output_id not in merge_points: - merge_points[output_id] = [] - merge_points[output_id].append(node_id) - - # Build the flow paths - paths = [] # List of paths through the graph - visited_in_paths = set() - - # Find all root nodes (no inputs) - root_nodes = [nid for nid, info in node_info.items() - if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] - - if not root_nodes and graph._flow_starts: - root_nodes = [n.node_id for n in graph._flow_starts] - - # Get all paths - for root_id in root_nodes: - paths.extend(trace_path(root_id, node_info, merge_points)) - - # Find the maximum label length for each depth level - max_label_length = {} - for depth in range(max_depth + 1): - if depth in depth_groups: - max_len = max(len(node_info[nid]['label']) for nid in depth_groups[depth]) - max_label_length[depth] = max_len - - # Draw the paths - drawn_nodes = set() - merge_drawn = set() - - # Group paths by their merge points - paths_by_merge = {} - standalone_paths = [] - - for path in paths: - if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: - merge_id = path[-1] - if merge_id not in paths_by_merge: - paths_by_merge[merge_id] = [] - paths_by_merge[merge_id].append(path) - else: - standalone_paths.append(path) - - # Draw merged paths - for merge_id, merge_paths in paths_by_merge.items(): - if merge_id in merge_drawn: - continue - - merge_info = node_info[merge_id] - sources = merge_points[merge_id] - - # Draw each source path leading to the merge - for i, source_id in enumerate(sources): - # Find the path containing this source - source_path = None - for path in merge_paths: - if source_id in path: - source_path = path[:path.index(source_id) + 1] - break - - if source_path: - # Build the line for this path - line_parts = [] - for j, nid in enumerate(source_path): - if j == 0: - line_parts.append(node_info[nid]['label']) - else: - line_parts.append(f" ──> {node_info[nid]['short_label']}") - - # Add the merge arrow - if i == 0: - # First source - line = "".join(line_parts) + " ─────┐" - lines.append(line) - elif i == len(sources) - 1: - # Last source - line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] - lines.append(line) - - # Continue with the rest of the path after merge - remaining = node_info[merge_id]['outputs'] - while remaining: - next_id = remaining[0] - lines[-1] += f" ──> {node_info[next_id]['label']}" - remaining = node_info[next_id]['outputs'] - drawn_nodes.add(next_id) - else: - # Middle sources - line = "".join(line_parts) + " ─────┤" - lines.append(line) - - for nid in source_path: - drawn_nodes.add(nid) - - drawn_nodes.add(merge_id) - merge_drawn.add(merge_id) - lines.append("") # Add spacing between merge groups - - # Draw standalone paths - for path in standalone_paths: - if all(nid in drawn_nodes for nid in path): - continue - - line_parts = [] - for i, node_id in enumerate(path): - if node_id not in drawn_nodes: - if i == 0: - line_parts.append(node_info[node_id]['label']) - else: - line_parts.append(f" ──> {node_info[node_id]['short_label']}") - drawn_nodes.add(node_id) - - if line_parts: - lines.append("".join(line_parts)) - - # Add any remaining undrawn nodes - for node_id in node_info: - if node_id not in drawn_nodes: - lines.append(node_info[node_id]['label'] + " (isolated)") - - lines.append("") - lines.append("=" * 80) - lines.append("Execution Order") - lines.append("=" * 80) - - try: - skip_nodes, ordered_nodes = compute_execution_plan(nodes=graph.nodes,flow_starts=graph._flow_starts+graph.get_implicit_starter_nodes()) - if ordered_nodes: - for i, node in enumerate(ordered_nodes, 1): - lines.append(f" {i:3d}. {node_info[node.node_id]['label']}") - except Exception as e: - lines.append(f" Could not determine execution order: {e}") - - # Print everything - output = "\n".join(lines) - print(output) - - return output - - # Calculate depth for each node def calculate_depth(node_id, node_info, visited=None): From cb9eab5905ae68d27b53b8e865987801e0dcfe67 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:54:21 +0100 Subject: [PATCH 36/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 55a74871..c712c24d 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -314,7 +314,7 @@ def __repr__(self): settings_str = " -" + '\n -'.join(f"{k}: {v}" for k, v in self.flow_settings) return f"FlowGraph(\nNodes: {self._node_db}\n\nSettings:\n{settings_str}" - def graph_tree(self): + def print_tree(self): """Print flow_graph as a visual tree structure, showing the DAG relationships with ASCII art.""" if not self._node_db: print("Empty flow graph") From 1f610aa49838abefc6c41c73bbbe089ad0383b6f Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 11:59:53 +0100 Subject: [PATCH 37/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index c712c24d..4707ac6b 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -1732,7 +1732,7 @@ def run_graph(self) -> RunInformation | None: skip_node_message(self.flow_logger, skip_nodes) execution_order_message(self.flow_logger, execution_order) performance_mode = self.flow_settings.execution_mode == 'Performance' - for node in self.execution_order: + for node in execution_order: node_logger = self.flow_logger.get_node_logger(node.node_id) if self.flow_settings.is_canceled: self.flow_logger.info('Flow canceled') From 7a220dddb8328986b35b1de27c23f9e5867c5757 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 12:11:42 +0100 Subject: [PATCH 38/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 4707ac6b..9f99ae65 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -1765,7 +1765,7 @@ def run_graph(self) -> RunInformation | None: node_result.is_running = False node_logger.error(f'Error in node {node.node_id}: {e}') if not node_result.success: - self.skip_nodes.extend(list(node.get_all_dependent_nodes())) + skip_nodes.extend(list(node.get_all_dependent_nodes())) node_logger.info(f'Completed node with success: {node_result.success}') self.nodes_completed += 1 self.flow_logger.info('Flow completed!') From 16be30414d781ade26d0ba0112942ccb4312816b Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Thu, 21 Aug 2025 12:22:41 +0100 Subject: [PATCH 39/43] Adding new graph_tree method with tests --- flowfile_core/tests/flowfile/test_code_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 4ce4ab52..896a4353 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2473,7 +2473,7 @@ def test_graph_tree(): assert_frame_equal(result_df, expected_df, check_row_order=False) tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] - assert any(element for element in tree_elements) in flow.print_tree() + assert all(element in flow.print_tree() for element in tree_elements) def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From 1a81d9406e181bc7bee029bbddcf6f6bea575815 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Fri, 22 Aug 2025 10:44:58 +0100 Subject: [PATCH 40/43] Adding new graph_tree method with tests --- flowfile_core/flowfile_core/flowfile/flow_graph.py | 3 +-- flowfile_core/tests/flowfile/test_code_generator.py | 8 ++++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 9f99ae65..6d773c64 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -516,9 +516,8 @@ def print_tree(self): # Print everything output = "\n".join(lines) + print(output) - - return output def get_nodes_overview(self): """Gets a list of dictionary representations for all nodes in the graph.""" diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 896a4353..e8048500 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2379,7 +2379,7 @@ def test_aggregation_functions(): expected_df = flow.get_node(2).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) -def test_graph_tree(): +def test_graph_tree(systemout): """ Test made to the FlowGraph's print_tree method""" flow = create_basic_flow() @@ -2472,8 +2472,12 @@ def test_graph_tree(): expected_df = flow.get_node(6).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) + flow.print_tree() + stdout = systemout.readouterr() + tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] - assert all(element in flow.print_tree() for element in tree_elements) + for element in tree_elements: + assert element in stdout def test_flow_with_disconnected_nodes(): """Test a flow where some nodes might not be connected properly""" From e737cc7c91dcdc4c37ec29ed837e6ba2ba0406d3 Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Fri, 22 Aug 2025 10:46:48 +0100 Subject: [PATCH 41/43] Adding new graph_tree method with tests --- flowfile_core/tests/flowfile/test_code_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index e8048500..545bff98 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2473,7 +2473,7 @@ def test_graph_tree(systemout): assert_frame_equal(result_df, expected_df, check_row_order=False) flow.print_tree() - stdout = systemout.readouterr() + stdout = systemout.readouterr().out tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] for element in tree_elements: From 0f7167158dcc6e8abdd46a7fac518503b5ec78bc Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Fri, 22 Aug 2025 10:48:20 +0100 Subject: [PATCH 42/43] Adding new graph_tree method with tests --- flowfile_core/tests/flowfile/test_code_generator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index 545bff98..b4274aa3 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -2379,7 +2379,7 @@ def test_aggregation_functions(): expected_df = flow.get_node(2).get_resulting_data().data_frame assert_frame_equal(result_df, expected_df, check_row_order=False) -def test_graph_tree(systemout): +def test_graph_tree(capsys): """ Test made to the FlowGraph's print_tree method""" flow = create_basic_flow() @@ -2473,7 +2473,7 @@ def test_graph_tree(systemout): assert_frame_equal(result_df, expected_df, check_row_order=False) flow.print_tree() - stdout = systemout.readouterr().out + stdout = capsys.readouterr().out tree_elements = ["(id=", ">", "1.", "Execution Order", "Flow Graph Visualization", "="] for element in tree_elements: From eb3f47c0cdc934302a0787898bf55702654e70bc Mon Sep 17 00:00:00 2001 From: Bernardo Fernandes Date: Fri, 22 Aug 2025 13:09:14 +0100 Subject: [PATCH 43/43] Refactoring graph_tree method --- .../flowfile_core/flowfile/flow_graph.py | 158 ++------------- .../flowfile_core/flowfile/util/graph_tree.py | 189 +++++++++++++++++- 2 files changed, 206 insertions(+), 141 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 6d773c64..8883e467 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -34,7 +34,7 @@ from flowfile_core.flowfile.analytics.utils import create_graphic_walker_node_from_node_promise from flowfile_core.flowfile.flow_node.flow_node import FlowNode from flowfile_core.flowfile.util.execution_orderer import compute_execution_plan -from flowfile_core.flowfile.util.graph_tree import calculate_depth, trace_path +from flowfile_core.flowfile.util.graph_tree import add_undrawn_nodes, build_flow_paths, build_node_info, calculate_depth, define_node_connections, draw_merged_paths, draw_standalone_paths, group_nodes_by_depth, trace_path from flowfile_core.flowfile.flow_data_engine.polars_code_parser import polars_code_parser from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import (ExternalDatabaseFetcher, ExternalDatabaseWriter, @@ -321,50 +321,14 @@ def print_tree(self): return # Build node information - node_info = {} - for node in self.nodes: - node_id = node.node_id - - # Get node label - operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" - label = f"{operation} (id={node_id})" - if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): - if node.setting_input.description: - desc = node.setting_input.description - if len(desc) > 20: # Truncate long descriptions - desc = desc[:17] + "..." - label = f"{operation} ({node_id}): {desc}" - - # Get inputs and outputs - inputs = { - 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], - 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, - 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None - } - outputs = [n.node_id for n in node.leads_to_nodes] - - node_info[node_id] = { - 'label': label, - 'short_label': f"{operation} ({node_id})", - 'inputs': inputs, - 'outputs': outputs, - 'depth': 0 - } + node_info = build_node_info(self.nodes) - # Calculate depths for all nodes for node_id in node_info: calculate_depth(node_id, node_info) # Group nodes by depth - depth_groups = {} - max_depth = 0 - for node_id, info in node_info.items(): - depth = info['depth'] - max_depth = max(max_depth, depth) - if depth not in depth_groups: - depth_groups[depth] = [] - depth_groups[depth].append(node_id) + depth_groups, max_depth = group_nodes_by_depth(node_info) # Sort nodes within each depth group for depth in depth_groups: @@ -378,28 +342,11 @@ def print_tree(self): lines.append("") # Track which nodes connect to what - merge_points = {} # target_id -> list of source_ids - for node_id, info in node_info.items(): - for output_id in info['outputs']: - if output_id not in merge_points: - merge_points[output_id] = [] - merge_points[output_id].append(node_id) + merge_points = define_node_connections(node_info) # Build the flow paths - paths = [] # List of paths through the graph - visited_in_paths = set() - - # Find all root nodes (no inputs) - root_nodes = [nid for nid, info in node_info.items() - if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] - - if not root_nodes and self._flow_starts: - root_nodes = [n.node_id for n in self._flow_starts] - - # Get all paths - for root_id in root_nodes: - paths.extend(trace_path(root_id, node_info, merge_points)) - + paths = build_flow_paths(node_info,self._flow_starts, merge_points) + # Find the maximum label length for each depth level max_label_length = {} for depth in range(max_depth + 1): @@ -415,6 +362,10 @@ def print_tree(self): paths_by_merge = {} standalone_paths = [] + #Build flow paths + paths = build_flow_paths(node_info, self._flow_starts, merge_points) + + # Define paths to merge and standalone paths for path in paths: if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: merge_id = path[-1] @@ -423,88 +374,15 @@ def print_tree(self): paths_by_merge[merge_id].append(path) else: standalone_paths.append(path) - + # Draw merged paths - for merge_id, merge_paths in paths_by_merge.items(): - if merge_id in merge_drawn: - continue - - merge_info = node_info[merge_id] - sources = merge_points[merge_id] - - # Draw each source path leading to the merge - for i, source_id in enumerate(sources): - # Find the path containing this source - source_path = None - for path in merge_paths: - if source_id in path: - source_path = path[:path.index(source_id) + 1] - break - - if source_path: - # Build the line for this path - line_parts = [] - for j, nid in enumerate(source_path): - if j == 0: - line_parts.append(node_info[nid]['label']) - else: - line_parts.append(f" ──> {node_info[nid]['short_label']}") - - # Add the merge arrow - if i == 0: - # First source - line = "".join(line_parts) + " ─────┐" - lines.append(line) - elif i == len(sources) - 1: - # Last source - line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] - lines.append(line) - - # Continue with the rest of the path after merge - remaining = node_info[merge_id]['outputs'] - while remaining: - next_id = remaining[0] - lines[-1] += f" ──> {node_info[next_id]['label']}" - remaining = node_info[next_id]['outputs'] - drawn_nodes.add(next_id) - else: - # Middle sources - line = "".join(line_parts) + " ─────┤" - lines.append(line) - - for nid in source_path: - drawn_nodes.add(nid) - - drawn_nodes.add(merge_id) - merge_drawn.add(merge_id) - lines.append("") # Add spacing between merge groups - - # Draw standalone paths - for path in standalone_paths: - if all(nid in drawn_nodes for nid in path): - continue - - line_parts = [] - for i, node_id in enumerate(path): - if node_id not in drawn_nodes: - if i == 0: - line_parts.append(node_info[node_id]['label']) - else: - line_parts.append(f" ──> {node_info[node_id]['short_label']}") - drawn_nodes.add(node_id) - - if line_parts: - lines.append("".join(line_parts)) - - # Add any remaining undrawn nodes - for node_id in node_info: - if node_id not in drawn_nodes: - lines.append(node_info[node_id]['label'] + " (isolated)") - - lines.append("") - lines.append("=" * 80) - lines.append("Execution Order") - lines.append("=" * 80) + draw_merged_paths(node_info, merge_points, paths_by_merge,merge_drawn, drawn_nodes, lines) + + # Draw standlone paths + draw_standalone_paths(drawn_nodes, standalone_paths, lines, node_info) + + # Add undrawn nodes + add_undrawn_nodes(drawn_nodes, node_info, lines) try: skip_nodes, ordered_nodes = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) diff --git a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py index 146cba81..9231ff0c 100644 --- a/flowfile_core/flowfile_core/flowfile/util/graph_tree.py +++ b/flowfile_core/flowfile_core/flowfile/util/graph_tree.py @@ -1,6 +1,8 @@ # Calculate depth for each node def calculate_depth(node_id, node_info, visited=None): + """Calculates the depth of each node.""" + if visited is None: visited = set() if node_id in visited: @@ -23,6 +25,7 @@ def calculate_depth(node_id, node_info, visited=None): # Trace paths from each root def trace_path(node_id, node_info, merge_points, current_path=None): + """Define the trace of each node path""" if current_path is None: current_path = [] @@ -43,4 +46,188 @@ def trace_path(node_id, node_info, merge_points, current_path=None): # Continue the path all_paths.extend(trace_path(output_id, node_info, merge_points, current_path)) - return all_paths \ No newline at end of file + return all_paths + +def build_node_info(nodes): + """Builds node information used to construct the graph tree.""" + + node_info = {} + + for node in nodes: + node_id = node.node_id + + # Get node label + operation = node.node_type.replace("_", " ").title() if node.node_type else "Unknown" + label = f"{operation} (id={node_id})" + if hasattr(node, 'setting_input') and hasattr(node.setting_input, 'description'): + if node.setting_input.description: + desc = node.setting_input.description + if len(desc) > 20: # Truncate long descriptions + desc = desc[:17] + "..." + label = f"{operation} ({node_id}): {desc}" + + # Get inputs and outputs + inputs = { + 'main': [n.node_id for n in (node.node_inputs.main_inputs or [])], + 'left': node.node_inputs.left_input.node_id if node.node_inputs.left_input else None, + 'right': node.node_inputs.right_input.node_id if node.node_inputs.right_input else None + } + outputs = [n.node_id for n in node.leads_to_nodes] + + node_info[node_id] = { + 'label': label, + 'short_label': f"{operation} ({node_id})", + 'inputs': inputs, + 'outputs': outputs, + 'depth': 0 + } + + return node_info + +def group_nodes_by_depth(node_info:dict): + """Groups each node by depth""" + + depth_groups = {} + max_depth = 0 + for node_id, info in node_info.items(): + depth = info['depth'] + max_depth = max(max_depth, depth) + if depth not in depth_groups: + depth_groups[depth] = [] + depth_groups[depth].append(node_id) + + return depth_groups, max_depth + +def define_node_connections(node_info:dict): + """Defines node connections to merge""" + + merge_points = {} # target_id -> list of source_ids + for node_id, info in node_info.items(): + for output_id in info['outputs']: + if output_id not in merge_points: + merge_points[output_id] = [] + merge_points[output_id].append(node_id) + + return merge_points + +def build_flow_paths(node_info:dict, flow_starts, merge_points:dict): + """Build the flow paths to be drawn""" + paths = [] # List of paths through the graph + visited_in_paths = set() + + # Find all root nodes (no inputs) + root_nodes = [nid for nid, info in node_info.items() + if not info['inputs']['main'] and not info['inputs']['left'] and not info['inputs']['right']] + + if not root_nodes and flow_starts: + root_nodes = [n.node_id for n in flow_starts] + + # Get all paths + for root_id in root_nodes: + paths.extend(trace_path(root_id, node_info, merge_points)) + + return paths + +def group_paths(paths:list, merge_points:dict): + """Groups each node path.""" + paths_by_merge = {} + standalone_paths = [] + + for path in paths: + if len(path) > 1 and path[-1] in merge_points and len(merge_points[path[-1]]) > 1: + merge_id = path[-1] + if merge_id not in paths_by_merge: + paths_by_merge[merge_id] = [] + paths_by_merge[merge_id].append(path) + else: + standalone_paths.append(path) + return paths_by_merge, standalone_paths + +def draw_merged_paths(node_info:dict, merge_points:dict, paths_by_merge:dict, merge_drawn:set, drawn_nodes:set, lines:str): + """Draws paths for each node that merges.""" + for merge_id, merge_paths in paths_by_merge.items(): + if merge_id in merge_drawn: + continue + + merge_info = node_info[merge_id] + sources = merge_points[merge_id] + + # Draw each source path leading to the merge + for i, source_id in enumerate(sources): + # Find the path containing this source + source_path = None + for path in merge_paths: + if source_id in path: + source_path = path[:path.index(source_id) + 1] + break + + if source_path: + # Build the line for this path + line_parts = [] + for j, nid in enumerate(source_path): + if j == 0: + line_parts.append(node_info[nid]['label']) + else: + line_parts.append(f" ──> {node_info[nid]['short_label']}") + + # Add the merge arrow + if i == 0: + # First source + line = "".join(line_parts) + " ─────┐" + lines.append(line) + elif i == len(sources) - 1: + # Last source + line = "".join(line_parts) + " ─────┴──> " + merge_info['label'] + lines.append(line) + + # Continue with the rest of the path after merge + remaining = node_info[merge_id]['outputs'] + while remaining: + next_id = remaining[0] + lines[-1] += f" ──> {node_info[next_id]['label']}" + remaining = node_info[next_id]['outputs'] + drawn_nodes.add(next_id) + else: + # Middle sources + line = "".join(line_parts) + " ─────┤" + lines.append(line) + + for nid in source_path: + drawn_nodes.add(nid) + + drawn_nodes.add(merge_id) + merge_drawn.add(merge_id) + lines.append("") # Add spacing between merge groups + return paths_by_merge + +def draw_standalone_paths(drawn_nodes:set, standalone_paths:list, lines:str, node_info:dict): + """ Draws paths that do not merge.""" + # Draw standalone paths + for path in standalone_paths: + if all(nid in drawn_nodes for nid in path): + continue + + line_parts = [] + for i, node_id in enumerate(path): + if node_id not in drawn_nodes: + if i == 0: + line_parts.append(node_info[node_id]['label']) + else: + line_parts.append(f" ──> {node_info[node_id]['short_label']}") + drawn_nodes.add(node_id) + + if line_parts: + lines.append("".join(line_parts)) + +def add_undrawn_nodes(drawn_nodes:set, node_info:dict, lines:str): + """Adds isolated nodes if exists.""" + # Add any remaining undrawn nodes + for node_id in node_info: + if node_id not in drawn_nodes: + lines.append(node_info[node_id]['label'] + " (isolated)") + + lines.append("") + lines.append("=" * 80) + lines.append("Execution Order") + lines.append("=" * 80) +