diff --git a/.gitignore b/.gitignore index c6d50fa8..cd336a0d 100644 --- a/.gitignore +++ b/.gitignore @@ -49,4 +49,5 @@ saved_flows/ /test_utils/postgres/postgres-docker-samples/ **/test_flowfile.db /flowfile/flowfile/web/static/ -flowfile.db \ No newline at end of file +flowfile.db +local_data \ No newline at end of file diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 62e05556..a3c73ab3 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -15,13 +15,14 @@ from flowfile_core.configs.flow_logger import FlowLogger from flowfile_core.flowfile.sources.external_sources.factory import data_source_factory from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import cast_str_to_polars_type, FlowfileColumn -from flowfile_core.flowfile.flow_data_engine.fuzzy_matching.settings_validator import (calculate_fuzzy_match_schema, - pre_calculate_pivot_schema) + from flowfile_core.flowfile.flow_data_engine.cloud_storage_reader import CloudStorageReader from flowfile_core.utils.arrow_reader import get_read_top_n from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine, execute_polars_code -from flowfile_core.flowfile.flow_data_engine.read_excel_tables import get_open_xlsx_datatypes, \ - get_calamine_xlsx_data_types +from flowfile_core.flowfile.flow_data_engine.read_excel_tables import (get_open_xlsx_datatypes, + get_calamine_xlsx_data_types) + +from flowfile_core.flowfile.schema_callbacks import (calculate_fuzzy_match_schema, pre_calculate_pivot_schema) from flowfile_core.flowfile.sources import external_sources from flowfile_core.schemas import input_schema, schemas, transform_schema from flowfile_core.schemas.output_model import NodeData, NodeResult, RunInformation diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py similarity index 88% rename from flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py rename to flowfile_core/flowfile_core/flowfile/schema_callbacks.py index a9405375..32baaa8a 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/settings_validator.py +++ b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py @@ -71,7 +71,8 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn], val_column_schema = get_schema_of_column(node_input_schema, pivot_input.value_col) if output_fields is not None and len(output_fields) > 0: return index_columns_schema+[FlowfileColumn(PlType(Plcolumn_name=output_field.name, - pl_datatype=output_field.data_type)) for output_field in output_fields] + pl_datatype=output_field.data_type)) for output_field in + output_fields] else: max_unique_vals = 200 @@ -84,7 +85,11 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn], f' Max unique values: {max_unique_vals}') pl_output_fields = [] for val in unique_vals: - for agg in pivot_input.aggregations: - output_type = get_output_data_type_pivot(val_column_schema, agg) - pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type)) + if len(pivot_input.aggregations) == 1: + output_type = get_output_data_type_pivot(val_column_schema, pivot_input.aggregations[0]) + pl_output_fields.append(PlType(column_name=str(val), pl_datatype=output_type)) + else: + for agg in pivot_input.aggregations: + output_type = get_output_data_type_pivot(val_column_schema, agg) + pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type)) return index_columns_schema + [FlowfileColumn(pl_output_field) for pl_output_field in pl_output_fields] diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index 4dfb65a8..66aac153 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -11,6 +11,8 @@ delete_cloud_connection, get_all_cloud_connections_interface) from flowfile_core.database.connection import get_db_context +from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn +from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema import pytest from pathlib import Path @@ -511,14 +513,51 @@ def test_add_pivot(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_sum', '3_sum', '2_sum', - '1_sum'}, 'Columns should be Country, 0_sum, 3_sum, 2_sum, 1_sum' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set( p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() handle_run_info(run_info) +def test_pivot_schema_callback(): + graph = create_graph() + input_data = (FlowDataEngine.create_random(10000).apply_flowfile_formula('random_int(0, 4)', 'groups') + .select_columns(['groups', 'Country', 'sales_data'])) + add_manual_input(graph, data=input_data.to_pylist()) + add_node_promise_on_type(graph, 'pivot', 2) + connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + add_connection(graph, connection) + pivot_input = transform_schema.PivotInput(pivot_column='groups', value_col='sales_data', index_columns=['Country'], + aggregations=['sum']) + pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) + graph.add_pivot(pivot_settings) + + +def test_schema_callback_in_graph(): + pivot_input = transform_schema.PivotInput(index_columns=['Country'], pivot_column='groups', + value_col='sales_data', aggregations=['sum']) + + data = (FlowDataEngine.create_random(10000) + .apply_flowfile_formula('random_int(0, 4)', 'groups') + .select_columns(['groups', 'Country', 'Work', 'sales_data'])) + node_input_schema = data.schema + input_lf = data.data_frame + result_schema = pre_calculate_pivot_schema(node_input_schema=node_input_schema, + pivot_input=pivot_input, + input_lf=input_lf,) + result_data = FlowDataEngine.create_from_schema(result_schema) + expected_schema = [input_schema.MinimalFieldInfo(name="Country", data_type="String"), + input_schema.MinimalFieldInfo(name='0', data_type='Float64'), + input_schema.MinimalFieldInfo(name='1', data_type='Float64'), + input_schema.MinimalFieldInfo(name='2', data_type='Float64'), + input_schema.MinimalFieldInfo(name='3', data_type='Float64')] + expected_data = FlowDataEngine.create_from_schema([FlowfileColumn.create_from_minimal_field_info(mfi) + for mfi in expected_schema]) + result_data.assert_equal(expected_data) + + + def test_add_pivot_string_count(): graph = create_graph() input_data = (FlowDataEngine.create_random(10000) @@ -533,8 +572,8 @@ def test_add_pivot_string_count(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_count', '3_count', '2_count', - '1_count'}, 'Columns should be Country, 0_count, 3_count, 2_count, 1_count' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', + '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set( p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() @@ -555,8 +594,8 @@ def test_add_pivot_string_concat(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - assert set(predicted_df.columns) == {'Country', '0_concat', '3_concat', '2_concat', - '1_concat'}, 'Columns should be Country, 0_concat, 3_concat, 2_concat, 1_concat' + assert set(predicted_df.columns) == {'Country', '0', '3', '2', + '1'}, 'Columns should be Country, 0, 3, 2, 1' assert {'str'} == set(p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same' run_info = graph.run_graph() handle_run_info(run_info) @@ -577,7 +616,7 @@ def test_try_add_to_big_pivot(): pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input) graph.add_pivot(pivot_settings) predicted_df = graph.get_node(2).get_predicted_resulting_data() - expected_columns = ['Country'] + [f'{i + 1}_sum' for i in range(200)] + expected_columns = ['Country'] + [f'{i + 1}' for i in range(200)] assert set(predicted_df.columns) == set(expected_columns), 'Should not have calculated the columns' run_info = graph.run_graph() handle_run_info(run_info) diff --git a/local_data/local_products.parquet b/local_data/local_products.parquet deleted file mode 100644 index 067b839a..00000000 Binary files a/local_data/local_products.parquet and /dev/null differ diff --git a/mkdocs.yml b/mkdocs.yml index f76b7234..36e925f1 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -3,6 +3,8 @@ site_name: Flowfile site_description: A visual ETL tool combining drag-and-drop workflow building with Polars dataframes site_url: https://edwardvaneechoud.github.io/Flowfile/ +repo_url: https://github.com/edwardvaneechoud/Flowfile + # Explicitly set the URL style for consistency and planning use_directory_urls: false @@ -67,6 +69,7 @@ markdown_extensions: - name: mermaid class: mermaid format: !!python/name:pymdownx.superfences.fence_code_format + - footnotes # MkDocs Navigation Structure