From a1104120f7eb76a7b0342f2f1f8294e8665c4bc5 Mon Sep 17 00:00:00 2001 From: edwardvaneechoud Date: Wed, 6 Aug 2025 21:33:37 +0200 Subject: [PATCH 1/2] Fixing handling of pivot --- .../flowfile_core/flowfile/flow_graph.py | 9 ++-- ...tings_validator.py => schema_callbacks.py} | 13 ++++-- flowfile_core/tests/flowfile/test_flowfile.py | 43 ++++++++++++++++++- 3 files changed, 55 insertions(+), 10 deletions(-) rename flowfile_core/flowfile_core/flowfile/{flow_data_engine/fuzzy_matching/settings_validator.py => schema_callbacks.py} (88%) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 11770b1f..58896b44 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -14,13 +14,14 @@ from flowfile_core.configs.flow_logger import FlowLogger from flowfile_core.flowfile.sources.external_sources.factory import data_source_factory from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import cast_str_to_polars_type, FlowfileColumn -from flowfile_core.flowfile.flow_data_engine.fuzzy_matching.settings_validator import (calculate_fuzzy_match_schema, - pre_calculate_pivot_schema) + from flowfile_core.flowfile.flow_data_engine.cloud_storage_reader import CloudStorageReader from flowfile_core.utils.arrow_reader import get_read_top_n from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine, execute_polars_code -from flowfile_core.flowfile.flow_data_engine.read_excel_tables import get_open_xlsx_datatypes, \ - get_calamine_xlsx_data_types +from flowfile_core.flowfile.flow_data_engine.read_excel_tables import (get_open_xlsx_datatypes, + get_calamine_xlsx_data_types) + +from flowfile_core.flowfile.schema_callbacks import (calculate_fuzzy_match_schema, pre_calculate_pivot_schema) from flowfile_core.flowfile.sources import external_sources from flowfile_core.schemas import input_schema, schemas, transform_schema from flowfile_core.schemas.output_model import TableExample, NodeData, NodeResult, RunInformation diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py similarity index 88% rename from flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py rename to flowfile_core/flowfile_core/flowfile/schema_callbacks.py index a9405375..32baaa8a 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py +++ b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py @@ -71,7 +71,8 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn], val_column_schema = get_schema_of_column(node_input_schema, pivot_input.value_col) if output_fields is not None and len(output_fields) > 0: return index_columns_schema+[FlowfileColumn(PlType(Plcolumn_name=output_field.name, - pl_datatype=output_field.data_type)) for output_field in output_fields] + pl_datatype=output_field.data_type)) for output_field in + output_fields] else: max_unique_vals = 200 @@ -84,7 +85,11 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn], f' Max unique values: {max_unique_vals}') pl_output_fields = [] for val in unique_vals: - for agg in pivot_input.aggregations: - output_type = get_output_data_type_pivot(val_column_schema, agg) - pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type)) + if len(pivot_input.aggregations) == 1: + output_type = get_output_data_type_pivot(val_column_schema, pivot_input.aggregations[0]) + pl_output_fields.append(PlType(column_name=str(val), pl_datatype=output_type)) + else: + for agg in pivot_input.aggregations: + output_type = get_output_data_type_pivot(val_column_schema, agg) + pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type)) return index_columns_schema + [FlowfileColumn(pl_output_field) for pl_output_field in pl_output_fields] diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index f24c10c3..57b3869f 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -11,6 +11,8 @@ delete_cloud_connection, get_all_cloud_connections_interface) from flowfile_core.database.connection import get_db_context +from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn +from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema import pytest from pathlib import Path @@ -492,14 +494,51 @@ def test_add_pivot(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_sum', '3_sum', '2_sum', - '1_sum'}, 'Columns should be Country, 0_sum, 3_sum, 2_sum, 1_sum' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set( p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() handle_run_info(run_info) +def test_pivot_schema_callback(): + graph = create_graph() + input_data = (FlowDataEngine.create_random(10000).apply_flowfile_formula('random_int(0, 4)', 'groups') + .select_columns(['groups', 'Country', 'sales_data'])) + add_manual_input(graph, data=input_data.to_pylist()) + add_node_promise_on_type(graph, 'pivot', 2) + connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + add_connection(graph, connection) + pivot_input = transform_schema.PivotInput(pivot_column='groups', value_col='sales_data', index_columns=['Country'], + aggregations=['sum']) + pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) + graph.add_pivot(pivot_settings) + + +def test_schema_callback_in_graph(): + pivot_input = transform_schema.PivotInput(index_columns=['Country'], pivot_column='groups', + value_col='sales_data', aggregations=['sum']) + + data = (FlowDataEngine.create_random(10000) + .apply_flowfile_formula('random_int(0, 4)', 'groups') + .select_columns(['groups', 'Country', 'Work', 'sales_data'])) + node_input_schema = data.schema + input_lf = data.data_frame + result_schema = pre_calculate_pivot_schema(node_input_schema=node_input_schema, + pivot_input=pivot_input, + input_lf=input_lf,) + result_data = FlowDataEngine.create_from_schema(result_schema) + expected_schema = [input_schema.MinimalFieldInfo(name="Country", data_type="String"), + input_schema.MinimalFieldInfo(name='0', data_type='Float64'), + input_schema.MinimalFieldInfo(name='1', data_type='Float64'), + input_schema.MinimalFieldInfo(name='2', data_type='Float64'), + input_schema.MinimalFieldInfo(name='3', data_type='Float64')] + expected_data = FlowDataEngine.create_from_schema([FlowfileColumn.create_from_minimal_field_info(mfi) + for mfi in expected_schema]) + result_data.assert_equal(expected_data) + + + def test_add_pivot_string_count(): graph = create_graph() input_data = (FlowDataEngine.create_random(10000) From 45a0655906ab31dffa5d9bb0156d8b46b84f2af3 Mon Sep 17 00:00:00 2001 From: edwardvaneechoud Date: Fri, 15 Aug 2025 12:05:02 +0200 Subject: [PATCH 2/2] Fixing test for pivot table --- .gitignore | 3 ++- flowfile_core/tests/flowfile/test_flowfile.py | 10 +++++----- local_data/local_products.parquet | Bin 1999 -> 0 bytes mkdocs.yml | 3 +++ 4 files changed, 10 insertions(+), 6 deletions(-) delete mode 100644 local_data/local_products.parquet diff --git a/.gitignore b/.gitignore index c6d50fa8..cd336a0d 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,5 @@ saved_flows/ /test_utils/postgres/postgres-docker-samples/ **/test_flowfile.db /flowfile/flowfile/web/static/ -flowfile.db \ No newline at end of file +flowfile.db +local_data \ No newline at end of file diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index 47f99f4f..66aac153 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -572,8 +572,8 @@ def test_add_pivot_string_count(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_count', '3_count', '2_count', - '1_count'}, 'Columns should be Country, 0_count, 3_count, 2_count, 1_count' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', + '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set( p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() @@ -594,8 +594,8 @@ def test_add_pivot_string_concat(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_concat', '3_concat', '2_concat', - '1_concat'}, 'Columns should be Country, 0_concat, 3_concat, 2_concat, 1_concat' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', + '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str'} == set(p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() handle_run_info(run_info) @@ -616,7 +616,7 @@ def test_try_add_to_big_pivot(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - expected_columns = ['Country'] + [f'{i + 1}_sum' for i in range(200)] + expected_columns = ['Country'] + [f'{i + 1}' for i in range(200)] assert set(predicted_df.columns) == set(expected_columns), 'Should not have calculated the columns' run_info = graph.run_graph() handle_run_info(run_info) diff --git a/local_data/local_products.parquet b/local_data/local_products.parquet deleted file mode 100644 index 067b839aadbd41c23bceec4a8ac80ef15233e0f8..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1999 zcmbtVYfw{16u!Bs2&jBq8oxkNLbLiDl-48tS`L!RV|@FlL;4X((W*L~Gf8tNh?!_qP!1*iSBJ7KfW zYcuCa&h1)Ie*a3VG^np3a!r5Z45|3cN3&P7=d^|gp57)--YT3C{zh+gzijgqiw$KRdeAD+_^I<;qm`(z)XBvjO$_e1X0l|wmKRt#oe&bp)=$h^4xLdN&$ z-=&>bC{*b<<^o|MY?oQH(8eR&33p*6CSj`~T10#{%gLY>!568gx?nEw(92?Z_*Yiu z0||f1rj4tP7R?Q)*`!|7s+?W_{_115?F*V}isEK&U)oZqJE8kL_dvZqVgIF)j>eLc zcOO-BHyILVy;|M7$9PIVZ`ZjNbJ8c-pALL%{rZ=(u4@PEDNshZ3L-Ffo`BCW7-_Xd zWzN&82`<^oA}hVfQCBfL`oD0(s;dcW3c(&PxhTArg|Q*#SUqhn(v}puL4FZUuQ72T zlSmh9L^7IInj0l!8pFA`+YkiCaODT#5mT@@-?1mF?)hDkQ)mK;j)1DWr3nD( z#@t^8WEQ}YTEQ_Wt;Q=7O2LdAsM=^5Eo^Xr(wLy14M6gEDcHMKX$Dny>%-a znP1y)4j_ACt1^PsGi^ZXWdv<9cuiKA?C-^cZ_YrP`&QlBLu&vg%DANle!zBF4_^N= z({9SNvMTC^)&lu8b*E!g(QkWXxJ`rdBwf(D`^AS!@0C(zxsEM`;30(6b5Od_(5W9a zU#PgLRXV^P1I%*U%Iy{~46#(5wLZ1HSx~IGS;Vp2Fg$Uzm5R%~$|W>w9p5}V_MVc` z$MWpT^b@+|{Q0>6Gvu#L>yUFpm4xD{;j!iH3b124q?I;7xZK!m`9QZbx2rr;1~OZ` z!2!$Z`khgs_I((;%AcS$w#-c1Zj7NJZlz+s0b%8LNL^r}UWi!8`i!Tk?k^(~fl2p1MWn zHcuMeJNvJe-Xlh}#4tweY-i|wqQ#$+KIcddv4#J4D#<&@-GC1fjftDRg)je5 zWnQMg?cKO(u zxk{!e6BSEQaFOJ9tACsYp4W8XhHgrQ7<$nWd&T9q$ diff --git a/mkdocs.yml b/mkdocs.yml index f76b7234..36e925f1 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -3,6 +3,8 @@ site_name: Flowfile site_description: A visual ETL tool combining drag-and-drop workflow building with Polars dataframes site_url: https://edwardvaneechoud.github.io/Flowfile/ +repo_url: https://github.com/edwardvaneechoud/Flowfile + # Explicitly set the URL style for consistency and planning use_directory_urls: false @@ -67,6 +69,7 @@ markdown_extensions: - name: mermaid class: mermaid format: !!python/name:pymdownx.superfences.fence_code_format + - footnotes # MkDocs Navigation Structure