Skip to content

Commit c1d1d1b

Browse files
Migrating to pl-fuzzy-frame-match (#108)
* Migrating to pl-fuzzy-frame-match * adding fuzzy match * Adding fuzzy match method to flowgraph * Schema callback changes in fuzzy match * Fixing tests and increasing overlap between generator and flowfile * Adapted pl-fuzzy-frame-match changes in branch * fix issue with test * adding prints to the test to debug * Make the schema_callback.py threadsafe and the object in fuzzy matching as well. * remove warning in _handle_fuzzy_match * increasing version fuzzy frame match * reverting change in the execution * Improve threading and order fuzzy match results based on incoming data
1 parent eba8025 commit c1d1d1b

File tree

35 files changed

+776
-2401
lines changed

35 files changed

+776
-2401
lines changed

flowfile/flowfile/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
get_all_available_cloud_storage_connections,
3737
create_cloud_storage_connection,
3838
del_cloud_storage_connection,
39-
create_cloud_storage_connection_if_not_exists)
39+
create_cloud_storage_connection_if_not_exists,
40+
FuzzyMapping)
4041
from flowfile_frame.expr import (
4142
col, lit, column, cum_count, len,
4243
sum, min, max, mean, count, when
@@ -71,7 +72,7 @@
7172
'scan_csv_from_cloud_storage', 'get_all_available_cloud_storage_connections', 'create_cloud_storage_connection',
7273
'del_cloud_storage_connection', 'create_cloud_storage_connection_if_not_exists',
7374
'FlowGraph', 'FlowDataEngine', 'node_interface', 'FlowSettings', 'transform_schema',
74-
'FlowNode', 'FlowfileColumn', 'FlowInformation',
75+
'FlowNode', 'FlowfileColumn', 'FlowInformation', "FuzzyMapping",
7576

7677
# Expression API
7778
'col', 'lit', 'column', 'cum_count', 'len',

flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from typing import List, Dict, Optional, Set, Tuple
22
import polars as pl
33

4+
from pl_fuzzy_frame_match.models import FuzzyMapping
5+
46
from flowfile_core.flowfile.flow_graph import FlowGraph
57
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, convert_pl_type_to_string
68
from flowfile_core.flowfile.flow_data_engine.flow_file_column.utils import cast_str_to_polars_type
@@ -825,6 +827,40 @@ def _handle_sample(self, settings: input_schema.NodeSample, var_name: str, input
825827
self._add_code(f"{var_name} = {input_df}.head(n={settings.sample_size})")
826828
self._add_code("")
827829

830+
@staticmethod
831+
def _transform_fuzzy_mappings_to_string(fuzzy_mappings: List[FuzzyMapping]) -> str:
832+
output_str = "["
833+
for i, fuzzy_mapping in enumerate(fuzzy_mappings):
834+
835+
output_str += (f"FuzzyMapping(left_col='{fuzzy_mapping.left_col}',"
836+
f" right_col='{fuzzy_mapping.right_col}', "
837+
f"threshold_score={fuzzy_mapping.threshold_score}, "
838+
f"fuzzy_type='{fuzzy_mapping.fuzzy_type}')")
839+
if i < len(fuzzy_mappings) - 1:
840+
output_str += ",\n"
841+
output_str += "]"
842+
return output_str
843+
844+
def _handle_fuzzy_match(self, settings: input_schema.NodeFuzzyMatch, var_name: str, input_vars: Dict[str, str]) -> None:
845+
"""Handle fuzzy match nodes."""
846+
self.imports.add("from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs")
847+
left_df = input_vars.get('main', input_vars.get('main_0', 'df_left'))
848+
right_df = input_vars.get('right', input_vars.get('main_1', 'df_right'))
849+
if left_df == right_df:
850+
right_df = "df_right"
851+
self._add_code(f"{right_df} = {left_df}")
852+
853+
if settings.join_input.left_select.has_drop_cols():
854+
self._add_code(f"{left_df} = {left_df}.drop({[c.old_name for c in settings.join_input.left_select.non_jk_drop_columns]})")
855+
if settings.join_input.right_select.has_drop_cols():
856+
self._add_code(f"{right_df} = {right_df}.drop({[c.old_name for c in settings.join_input.right_select.non_jk_drop_columns]})")
857+
858+
fuzzy_join_mapping_settings = self._transform_fuzzy_mappings_to_string(settings.join_input.join_mapping)
859+
self._add_code(f"{var_name} = fuzzy_match_dfs(\n"
860+
f" left_df={left_df}, right_df={right_df},\n"
861+
f" fuzzy_maps={fuzzy_join_mapping_settings}\n"
862+
f" ).lazy()")
863+
828864
def _handle_unique(self, settings: input_schema.NodeUnique, var_name: str, input_vars: Dict[str, str]) -> None:
829865
"""Handle unique/distinct nodes."""
830866
input_df = input_vars.get('main', 'df')

flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py

Lines changed: 32 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
from math import ceil
77
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, TypeVar, Literal, Generator
88

9+
from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs
10+
911
# Third-party imports
1012
from loky import Future
1113
import polars as pl
@@ -1650,68 +1652,41 @@ def start_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
16501652
An `ExternalFuzzyMatchFetcher` object that can be used to track the
16511653
progress and retrieve the result of the fuzzy join.
16521654
"""
1653-
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other,
1654-
fuzzy_match_input=fuzzy_match_input)
1655+
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
16551656
return ExternalFuzzyMatchFetcher(left_df, right_df,
16561657
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
16571658
file_ref=file_ref + '_fm',
16581659
wait_on_completion=False,
16591660
flow_id=flow_id,
16601661
node_id=node_id)
16611662

1662-
def do_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
1663-
other: "FlowDataEngine", file_ref: str, flow_id: int = -1,
1664-
node_id: int | str = -1) -> "FlowDataEngine":
1665-
"""Performs a fuzzy join with another DataFrame.
1666-
1667-
This method blocks until the fuzzy join operation is complete.
1668-
1669-
Args:
1670-
fuzzy_match_input: A `FuzzyMatchInput` object with the matching parameters.
1671-
other: The right `FlowDataEngine` to join with.
1672-
file_ref: A reference string for temporary files.
1673-
flow_id: The flow ID for tracking.
1674-
node_id: The node ID for tracking.
1675-
1676-
Returns:
1677-
A new `FlowDataEngine` instance with the result of the fuzzy join.
1678-
"""
1679-
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other,
1680-
fuzzy_match_input=fuzzy_match_input)
1681-
f = ExternalFuzzyMatchFetcher(left_df, right_df,
1682-
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
1683-
file_ref=file_ref + '_fm',
1684-
wait_on_completion=True,
1685-
flow_id=flow_id,
1686-
node_id=node_id)
1687-
return FlowDataEngine(f.get_result())
1688-
1689-
def fuzzy_match(self, right: "FlowDataEngine", left_on: str, right_on: str,
1690-
fuzzy_method: str = 'levenshtein', threshold: float = 0.75) -> "FlowDataEngine":
1691-
"""Performs a simple fuzzy match between two DataFrames on a single column pair.
1692-
1693-
This is a convenience method for a common fuzzy join scenario.
1694-
1695-
Args:
1696-
right: The right `FlowDataEngine` to match against.
1697-
left_on: The column name from the left DataFrame to match on.
1698-
right_on: The column name from the right DataFrame to match on.
1699-
fuzzy_method: The fuzzy matching algorithm to use (e.g., 'levenshtein').
1700-
threshold: The similarity score threshold (0.0 to 1.0) for a match.
1701-
1702-
Returns:
1703-
A new `FlowDataEngine` with the matched data.
1704-
"""
1705-
fuzzy_match_input = transform_schemas.FuzzyMatchInput(
1706-
[transform_schemas.FuzzyMap(
1707-
left_on, right_on,
1708-
fuzzy_type=fuzzy_method,
1709-
threshold_score=threshold
1710-
)],
1711-
left_select=self.columns,
1712-
right_select=right.columns
1713-
)
1714-
return self.do_fuzzy_join(fuzzy_match_input, right, str(id(self)))
1663+
def fuzzy_join_external(self,
1664+
fuzzy_match_input: transform_schemas.FuzzyMatchInput,
1665+
other: "FlowDataEngine",
1666+
file_ref: str = None,
1667+
flow_id: int = -1,
1668+
node_id: int = -1
1669+
):
1670+
if file_ref is None:
1671+
file_ref = str(id(self)) + '_' + str(id(other))
1672+
1673+
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
1674+
external_tracker = ExternalFuzzyMatchFetcher(left_df, right_df,
1675+
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
1676+
file_ref=file_ref + '_fm',
1677+
wait_on_completion=False,
1678+
flow_id=flow_id,
1679+
node_id=node_id)
1680+
return FlowDataEngine(external_tracker.get_result())
1681+
1682+
def fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
1683+
other: "FlowDataEngine",
1684+
node_logger: NodeLogger = None) -> "FlowDataEngine":
1685+
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
1686+
fuzzy_mappings = [FuzzyMapping(**fm.__dict__) for fm in fuzzy_match_input.fuzzy_maps]
1687+
return FlowDataEngine(fuzzy_match_dfs(left_df, right_df, fuzzy_maps=fuzzy_mappings,
1688+
logger=node_logger.logger if node_logger else logger)
1689+
.lazy())
17151690

17161691
def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput,
17171692
auto_generate_selection: bool, verify_integrity: bool,
@@ -1733,11 +1708,12 @@ def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput,
17331708
Exception: If `verify_integrity` is True and the join would result in
17341709
an excessively large number of records.
17351710
"""
1711+
17361712
self.lazy = True
1713+
17371714
other.lazy = True
17381715

17391716
verify_join_select_integrity(cross_join_input, left_columns=self.columns, right_columns=other.columns)
1740-
17411717
right_select = [v.old_name for v in cross_join_input.right_select.renames
17421718
if (v.keep or v.join_key) and v.is_available]
17431719
left_select = [v.old_name for v in cross_join_input.left_select.renames

flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_file_column/main.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,67 @@ def __init__(self, polars_type: PlType):
7676
self.__sql_type = None
7777
self.__perc_unique = None
7878

79+
def __repr__(self):
80+
"""
81+
Provides a concise, developer-friendly representation of the object.
82+
Ideal for debugging and console inspection.
83+
"""
84+
return (f"FlowfileColumn(name='{self.column_name}', "
85+
f"type={self.data_type}, "
86+
f"size={self.size}, "
87+
f"nulls={self.number_of_empty_values})")
88+
89+
def __str__(self):
90+
"""
91+
Provides a detailed, readable summary of the column's metadata.
92+
It conditionally omits any attribute that is None, ensuring a clean output.
93+
"""
94+
# --- Header (Always Shown) ---
95+
header = f"<FlowfileColumn: '{self.column_name}'>"
96+
lines = []
97+
98+
# --- Core Attributes (Conditionally Shown) ---
99+
if self.data_type is not None:
100+
lines.append(f" Type: {self.data_type}")
101+
if self.size is not None:
102+
lines.append(f" Non-Nulls: {self.size}")
103+
104+
# Calculate and display nulls if possible
105+
if self.size is not None and self.number_of_empty_values is not None:
106+
total_entries = self.size + self.number_of_empty_values
107+
if total_entries > 0:
108+
null_perc = (self.number_of_empty_values / total_entries) * 100
109+
null_info = f"{self.number_of_empty_values} ({null_perc:.1f}%)"
110+
else:
111+
null_info = "0 (0.0%)"
112+
lines.append(f" Nulls: {null_info}")
113+
114+
if self.number_of_unique_values is not None:
115+
lines.append(f" Unique: {self.number_of_unique_values}")
116+
117+
# --- Conditional Stats Section ---
118+
stats = []
119+
if self.min_value is not None:
120+
stats.append(f" Min: {self.min_value}")
121+
if self.max_value is not None:
122+
stats.append(f" Max: {self.max_value}")
123+
if self.average_value is not None:
124+
stats.append(f" Mean: {self.average_value}")
125+
126+
if stats:
127+
lines.append(" Stats:")
128+
lines.extend(stats)
129+
130+
# --- Conditional Examples Section ---
131+
if self.example_values:
132+
example_str = str(self.example_values)
133+
# Truncate long example strings for cleaner display
134+
if len(example_str) > 70:
135+
example_str = example_str[:67] + '...'
136+
lines.append(f" Examples: {example_str}")
137+
138+
return f"{header}\n" + "\n".join(lines)
139+
79140
@classmethod
80141
def create_from_polars_type(cls, polars_type: PlType, **kwargs) -> "FlowfileColumn":
81142
for k, v in kwargs.items():

flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/prepare_for_fuzzy_match.py

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,49 @@
1-
from flowfile_core.schemas.transform_schema import FuzzyMatchInput
1+
from flowfile_core.schemas.transform_schema import FuzzyMatchInput, SelectInput, JoinInputs
22
from flowfile_core.flowfile.flow_data_engine.join import verify_join_select_integrity, verify_join_map_integrity
33
import polars as pl
4-
from typing import TYPE_CHECKING, Tuple
4+
from typing import TYPE_CHECKING, Tuple, List
55

66
if TYPE_CHECKING:
77
from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine
88

99

10+
def _order_join_inputs_based_on_col_order(col_order: List[str], join_inputs: JoinInputs) -> None:
11+
"""
12+
Ensure that the select columns in the fuzzy match input match the order of the incoming columns.
13+
This function modifies the join_inputs object in-place.
14+
15+
Returns:
16+
None
17+
"""
18+
select_map = {select.new_name: select for select in join_inputs.renames}
19+
ordered_renames = [select_map[col] for col in col_order if col in select_map]
20+
join_inputs.renames = ordered_renames
21+
22+
23+
def _ensure_all_columns_have_select(left: "FlowDataEngine",
24+
right: "FlowDataEngine",
25+
fuzzy_match_input: FuzzyMatchInput):
26+
"""
27+
Ensure that all columns in the left and right FlowDataEngines are included in the fuzzy match input's select
28+
statements.
29+
Args:
30+
left (FlowDataEngine):
31+
right (FlowDataEngine):
32+
fuzzy_match_input ():
33+
34+
Returns:
35+
None
36+
"""
37+
right_cols_in_select = {c.old_name for c in fuzzy_match_input.right_select.renames}
38+
left_cols_in_select = {c.old_name for c in fuzzy_match_input.left_select.renames}
39+
40+
fuzzy_match_input.left_select.renames.extend(
41+
[SelectInput(col) for col in left.columns if col not in left_cols_in_select])
42+
fuzzy_match_input.right_select.renames.extend(
43+
[SelectInput(col) for col in right.columns if col not in right_cols_in_select]
44+
)
45+
46+
1047
def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine",
1148
fuzzy_match_input: FuzzyMatchInput) -> Tuple[pl.LazyFrame, pl.LazyFrame]:
1249
"""
@@ -19,14 +56,18 @@ def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine",
1956
Returns:
2057
Tuple[pl.LazyFrame, pl.LazyFrame]: Prepared left and right lazy frames
2158
"""
22-
2359
left.lazy = True
2460
right.lazy = True
61+
_ensure_all_columns_have_select(left, right, fuzzy_match_input)
62+
_order_join_inputs_based_on_col_order(left.columns, fuzzy_match_input.left_select)
63+
_order_join_inputs_based_on_col_order(right.columns, fuzzy_match_input.right_select)
64+
2565
verify_join_select_integrity(fuzzy_match_input, left_columns=left.columns, right_columns=right.columns)
2666
if not verify_join_map_integrity(fuzzy_match_input, left_columns=left.schema, right_columns=right.schema):
2767
raise Exception('Join is not valid by the data fields')
2868
fuzzy_match_input = fuzzy_match_input
2969
fuzzy_match_input.auto_rename()
70+
3071
right_select = [v.old_name for v in fuzzy_match_input.right_select.renames if
3172
(v.keep or v.join_key) and v.is_available]
3273
left_select = [v.old_name for v in fuzzy_match_input.left_select.renames if

flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/models.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from typing import Any, Optional, Literal
22
from pydantic import BaseModel
3-
from flowfile_core.schemas.transform_schema import FuzzyMap
3+
from pl_fuzzy_frame_match.models import FuzzyMapping
44

55
OperationType = Literal['store', 'calculate_schema', 'calculate_number_of_records', 'write_output', 'store_sample']
66

@@ -20,8 +20,8 @@ class FuzzyJoinInput(BaseModel):
2020
cache_dir: Optional[str] = None
2121
left_df_operation: PolarsOperation
2222
right_df_operation: PolarsOperation
23-
fuzzy_maps: list[FuzzyMap]
24-
flowfile_node_id: int|str
23+
fuzzy_maps: list[FuzzyMapping]
24+
flowfile_node_id: int | str
2525
flowfile_flow_id: int
2626

2727

flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@
99
import polars as pl
1010
import requests
1111

12+
from pl_fuzzy_frame_match.models import FuzzyMapping
13+
1214
from flowfile_core.configs import logger
1315
from flowfile_core.configs.settings import WORKER_URL
1416
from flowfile_core.flowfile.flow_data_engine.subprocess_operations.models import (
1517
FuzzyJoinInput,
16-
FuzzyMap,
1718
OperationType,
1819
PolarsOperation,
1920
Status
@@ -53,7 +54,7 @@ def trigger_sample_operation(lf: pl.LazyFrame, file_ref: str, flow_id: int, node
5354

5455

5556
def trigger_fuzzy_match_operation(left_df: pl.LazyFrame, right_df: pl.LazyFrame,
56-
fuzzy_maps: List[FuzzyMap],
57+
fuzzy_maps: List[FuzzyMapping],
5758
file_ref: str,
5859
flow_id: int,
5960
node_id: int | str) -> Status:

0 commit comments

Comments
 (0)