Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ saved_flows/
/test_utils/postgres/postgres-docker-samples/
**/test_flowfile.db
/flowfile/flowfile/web/static/
flowfile.db
flowfile.db
local_data
9 changes: 5 additions & 4 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
from flowfile_core.configs.flow_logger import FlowLogger
from flowfile_core.flowfile.sources.external_sources.factory import data_source_factory
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import cast_str_to_polars_type, FlowfileColumn
from flowfile_core.flowfile.flow_data_engine.fuzzy_matching.settings_validator import (calculate_fuzzy_match_schema,
pre_calculate_pivot_schema)

from flowfile_core.flowfile.flow_data_engine.cloud_storage_reader import CloudStorageReader
from flowfile_core.utils.arrow_reader import get_read_top_n
from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine, execute_polars_code
from flowfile_core.flowfile.flow_data_engine.read_excel_tables import get_open_xlsx_datatypes, \
get_calamine_xlsx_data_types
from flowfile_core.flowfile.flow_data_engine.read_excel_tables import (get_open_xlsx_datatypes,
get_calamine_xlsx_data_types)

from flowfile_core.flowfile.schema_callbacks import (calculate_fuzzy_match_schema, pre_calculate_pivot_schema)
from flowfile_core.flowfile.sources import external_sources
from flowfile_core.schemas import input_schema, schemas, transform_schema
from flowfile_core.schemas.output_model import NodeData, NodeResult, RunInformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn],
val_column_schema = get_schema_of_column(node_input_schema, pivot_input.value_col)
if output_fields is not None and len(output_fields) > 0:
return index_columns_schema+[FlowfileColumn(PlType(Plcolumn_name=output_field.name,
pl_datatype=output_field.data_type)) for output_field in output_fields]
pl_datatype=output_field.data_type)) for output_field in
output_fields]

else:
max_unique_vals = 200
Expand All @@ -84,7 +85,11 @@ def pre_calculate_pivot_schema(node_input_schema: List[FlowfileColumn],
f' Max unique values: {max_unique_vals}')
pl_output_fields = []
for val in unique_vals:
for agg in pivot_input.aggregations:
output_type = get_output_data_type_pivot(val_column_schema, agg)
pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type))
if len(pivot_input.aggregations) == 1:
output_type = get_output_data_type_pivot(val_column_schema, pivot_input.aggregations[0])
pl_output_fields.append(PlType(column_name=str(val), pl_datatype=output_type))
else:
for agg in pivot_input.aggregations:
output_type = get_output_data_type_pivot(val_column_schema, agg)
pl_output_fields.append(PlType(column_name=f'{val}_{agg}', pl_datatype=output_type))
return index_columns_schema + [FlowfileColumn(pl_output_field) for pl_output_field in pl_output_fields]
53 changes: 46 additions & 7 deletions flowfile_core/tests/flowfile/test_flowfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
delete_cloud_connection,
get_all_cloud_connections_interface)
from flowfile_core.database.connection import get_db_context
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn
from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema

import pytest
from pathlib import Path
Expand Down Expand Up @@ -511,14 +513,51 @@ def test_add_pivot():
pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input)
graph.add_pivot(pivot_settings)
predicted_df = graph.get_node(2).get_predicted_resulting_data()
assert set(predicted_df.columns) == {'Country', '0_sum', '3_sum', '2_sum',
'1_sum'}, 'Columns should be Country, 0_sum, 3_sum, 2_sum, 1_sum'
assert set(predicted_df.columns) == {'Country', '0', '3', '2', '1'}, 'Columns should be Country, 0, 3, 2, 1'
assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set(
p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same'
run_info = graph.run_graph()
handle_run_info(run_info)


def test_pivot_schema_callback():
graph = create_graph()
input_data = (FlowDataEngine.create_random(10000).apply_flowfile_formula('random_int(0, 4)', 'groups')
.select_columns(['groups', 'Country', 'sales_data']))
add_manual_input(graph, data=input_data.to_pylist())
add_node_promise_on_type(graph, 'pivot', 2)
connection = input_schema.NodeConnection.create_from_simple_input(1, 2)
add_connection(graph, connection)
pivot_input = transform_schema.PivotInput(pivot_column='groups', value_col='sales_data', index_columns=['Country'],
aggregations=['sum'])
pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input)
graph.add_pivot(pivot_settings)


def test_schema_callback_in_graph():
pivot_input = transform_schema.PivotInput(index_columns=['Country'], pivot_column='groups',
value_col='sales_data', aggregations=['sum'])

data = (FlowDataEngine.create_random(10000)
.apply_flowfile_formula('random_int(0, 4)', 'groups')
.select_columns(['groups', 'Country', 'Work', 'sales_data']))
node_input_schema = data.schema
input_lf = data.data_frame
result_schema = pre_calculate_pivot_schema(node_input_schema=node_input_schema,
pivot_input=pivot_input,
input_lf=input_lf,)
result_data = FlowDataEngine.create_from_schema(result_schema)
expected_schema = [input_schema.MinimalFieldInfo(name="Country", data_type="String"),
input_schema.MinimalFieldInfo(name='0', data_type='Float64'),
input_schema.MinimalFieldInfo(name='1', data_type='Float64'),
input_schema.MinimalFieldInfo(name='2', data_type='Float64'),
input_schema.MinimalFieldInfo(name='3', data_type='Float64')]
expected_data = FlowDataEngine.create_from_schema([FlowfileColumn.create_from_minimal_field_info(mfi)
for mfi in expected_schema])
result_data.assert_equal(expected_data)



def test_add_pivot_string_count():
graph = create_graph()
input_data = (FlowDataEngine.create_random(10000)
Expand All @@ -533,8 +572,8 @@ def test_add_pivot_string_count():
pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input)
graph.add_pivot(pivot_settings)
predicted_df = graph.get_node(2).get_predicted_resulting_data()
assert set(predicted_df.columns) == {'Country', '0_count', '3_count', '2_count',
'1_count'}, 'Columns should be Country, 0_count, 3_count, 2_count, 1_count'
assert set(predicted_df.columns) == {'Country', '0', '3', '2',
'1'}, 'Columns should be Country, 0, 3, 2, 1'
assert {'str', 'numeric', 'numeric', 'numeric', 'numeric'} == set(
p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same'
run_info = graph.run_graph()
Expand All @@ -555,8 +594,8 @@ def test_add_pivot_string_concat():
pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input)
graph.add_pivot(pivot_settings)
predicted_df = graph.get_node(2).get_predicted_resulting_data()
assert set(predicted_df.columns) == {'Country', '0_concat', '3_concat', '2_concat',
'1_concat'}, 'Columns should be Country, 0_concat, 3_concat, 2_concat, 1_concat'
assert set(predicted_df.columns) == {'Country', '0', '3', '2',
'1'}, 'Columns should be Country, 0, 3, 2, 1'
assert {'str'} == set(p.generic_datatype() for p in predicted_df.schema), 'Data types should be the same'
run_info = graph.run_graph()
handle_run_info(run_info)
Expand All @@ -577,7 +616,7 @@ def test_try_add_to_big_pivot():
pivot_settings = input_schema.NodePivot(flow_id=1, node_id=2, pivot_input=pivot_input)
graph.add_pivot(pivot_settings)
predicted_df = graph.get_node(2).get_predicted_resulting_data()
expected_columns = ['Country'] + [f'{i + 1}_sum' for i in range(200)]
expected_columns = ['Country'] + [f'{i + 1}' for i in range(200)]
assert set(predicted_df.columns) == set(expected_columns), 'Should not have calculated the columns'
run_info = graph.run_graph()
handle_run_info(run_info)
Expand Down
Binary file removed local_data/local_products.parquet
Binary file not shown.
3 changes: 3 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
site_name: Flowfile
site_description: A visual ETL tool combining drag-and-drop workflow building with Polars dataframes
site_url: https://edwardvaneechoud.github.io/Flowfile/
repo_url: https://github.com/edwardvaneechoud/Flowfile


# Explicitly set the URL style for consistency and planning
use_directory_urls: false
Expand Down Expand Up @@ -67,6 +69,7 @@ markdown_extensions:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- footnotes

# MkDocs Navigation Structure

Expand Down
Loading