Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions flowfile/flowfile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
get_all_available_cloud_storage_connections,
create_cloud_storage_connection,
del_cloud_storage_connection,
create_cloud_storage_connection_if_not_exists)
create_cloud_storage_connection_if_not_exists,
FuzzyMapping)
from flowfile_frame.expr import (
col, lit, column, cum_count, len,
sum, min, max, mean, count, when
Expand Down Expand Up @@ -71,7 +72,7 @@
'scan_csv_from_cloud_storage', 'get_all_available_cloud_storage_connections', 'create_cloud_storage_connection',
'del_cloud_storage_connection', 'create_cloud_storage_connection_if_not_exists',
'FlowGraph', 'FlowDataEngine', 'node_interface', 'FlowSettings', 'transform_schema',
'FlowNode', 'FlowfileColumn', 'FlowInformation',
'FlowNode', 'FlowfileColumn', 'FlowInformation', "FuzzyMapping",

# Expression API
'col', 'lit', 'column', 'cum_count', 'len',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import List, Dict, Optional, Set, Tuple
import polars as pl

from pl_fuzzy_frame_match.models import FuzzyMapping

from flowfile_core.flowfile.flow_graph import FlowGraph
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, convert_pl_type_to_string
from flowfile_core.flowfile.flow_data_engine.flow_file_column.utils import cast_str_to_polars_type
Expand Down Expand Up @@ -825,6 +827,40 @@ def _handle_sample(self, settings: input_schema.NodeSample, var_name: str, input
self._add_code(f"{var_name} = {input_df}.head(n={settings.sample_size})")
self._add_code("")

@staticmethod
def _transform_fuzzy_mappings_to_string(fuzzy_mappings: List[FuzzyMapping]) -> str:
output_str = "["
for i, fuzzy_mapping in enumerate(fuzzy_mappings):

output_str += (f"FuzzyMapping(left_col='{fuzzy_mapping.left_col}',"
f" right_col='{fuzzy_mapping.right_col}', "
f"threshold_score={fuzzy_mapping.threshold_score}, "
f"fuzzy_type='{fuzzy_mapping.fuzzy_type}')")
if i < len(fuzzy_mappings) - 1:
output_str += ",\n"
output_str += "]"
return output_str

def _handle_fuzzy_match(self, settings: input_schema.NodeFuzzyMatch, var_name: str, input_vars: Dict[str, str]) -> None:
"""Handle fuzzy match nodes."""
self.imports.add("from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs")
left_df = input_vars.get('main', input_vars.get('main_0', 'df_left'))
right_df = input_vars.get('right', input_vars.get('main_1', 'df_right'))
if left_df == right_df:
right_df = "df_right"
self._add_code(f"{right_df} = {left_df}")

if settings.join_input.left_select.has_drop_cols():
self._add_code(f"{left_df} = {left_df}.drop({[c.old_name for c in settings.join_input.left_select.non_jk_drop_columns]})")
if settings.join_input.right_select.has_drop_cols():
self._add_code(f"{right_df} = {right_df}.drop({[c.old_name for c in settings.join_input.right_select.non_jk_drop_columns]})")

fuzzy_join_mapping_settings = self._transform_fuzzy_mappings_to_string(settings.join_input.join_mapping)
self._add_code(f"{var_name} = fuzzy_match_dfs(\n"
f" left_df={left_df}, right_df={right_df},\n"
f" fuzzy_maps={fuzzy_join_mapping_settings}\n"
f" ).lazy()")

def _handle_unique(self, settings: input_schema.NodeUnique, var_name: str, input_vars: Dict[str, str]) -> None:
"""Handle unique/distinct nodes."""
input_df = input_vars.get('main', 'df')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from math import ceil
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, TypeVar, Literal, Generator

from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs

# Third-party imports
from loky import Future
import polars as pl
Expand Down Expand Up @@ -1650,68 +1652,41 @@ def start_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
An `ExternalFuzzyMatchFetcher` object that can be used to track the
progress and retrieve the result of the fuzzy join.
"""
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other,
fuzzy_match_input=fuzzy_match_input)
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
return ExternalFuzzyMatchFetcher(left_df, right_df,
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
file_ref=file_ref + '_fm',
wait_on_completion=False,
flow_id=flow_id,
node_id=node_id)

def do_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
other: "FlowDataEngine", file_ref: str, flow_id: int = -1,
node_id: int | str = -1) -> "FlowDataEngine":
"""Performs a fuzzy join with another DataFrame.

This method blocks until the fuzzy join operation is complete.

Args:
fuzzy_match_input: A `FuzzyMatchInput` object with the matching parameters.
other: The right `FlowDataEngine` to join with.
file_ref: A reference string for temporary files.
flow_id: The flow ID for tracking.
node_id: The node ID for tracking.

Returns:
A new `FlowDataEngine` instance with the result of the fuzzy join.
"""
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other,
fuzzy_match_input=fuzzy_match_input)
f = ExternalFuzzyMatchFetcher(left_df, right_df,
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
file_ref=file_ref + '_fm',
wait_on_completion=True,
flow_id=flow_id,
node_id=node_id)
return FlowDataEngine(f.get_result())

def fuzzy_match(self, right: "FlowDataEngine", left_on: str, right_on: str,
fuzzy_method: str = 'levenshtein', threshold: float = 0.75) -> "FlowDataEngine":
"""Performs a simple fuzzy match between two DataFrames on a single column pair.

This is a convenience method for a common fuzzy join scenario.

Args:
right: The right `FlowDataEngine` to match against.
left_on: The column name from the left DataFrame to match on.
right_on: The column name from the right DataFrame to match on.
fuzzy_method: The fuzzy matching algorithm to use (e.g., 'levenshtein').
threshold: The similarity score threshold (0.0 to 1.0) for a match.

Returns:
A new `FlowDataEngine` with the matched data.
"""
fuzzy_match_input = transform_schemas.FuzzyMatchInput(
[transform_schemas.FuzzyMap(
left_on, right_on,
fuzzy_type=fuzzy_method,
threshold_score=threshold
)],
left_select=self.columns,
right_select=right.columns
)
return self.do_fuzzy_join(fuzzy_match_input, right, str(id(self)))
def fuzzy_join_external(self,
fuzzy_match_input: transform_schemas.FuzzyMatchInput,
other: "FlowDataEngine",
file_ref: str = None,
flow_id: int = -1,
node_id: int = -1
):
if file_ref is None:
file_ref = str(id(self)) + '_' + str(id(other))

left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
external_tracker = ExternalFuzzyMatchFetcher(left_df, right_df,
fuzzy_maps=fuzzy_match_input.fuzzy_maps,
file_ref=file_ref + '_fm',
wait_on_completion=False,
flow_id=flow_id,
node_id=node_id)
return FlowDataEngine(external_tracker.get_result())

def fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput,
other: "FlowDataEngine",
node_logger: NodeLogger = None) -> "FlowDataEngine":
left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input)
fuzzy_mappings = [FuzzyMapping(**fm.__dict__) for fm in fuzzy_match_input.fuzzy_maps]
return FlowDataEngine(fuzzy_match_dfs(left_df, right_df, fuzzy_maps=fuzzy_mappings,
logger=node_logger.logger if node_logger else logger)
.lazy())

def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput,
auto_generate_selection: bool, verify_integrity: bool,
Expand All @@ -1733,11 +1708,12 @@ def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput,
Exception: If `verify_integrity` is True and the join would result in
an excessively large number of records.
"""

self.lazy = True

other.lazy = True

verify_join_select_integrity(cross_join_input, left_columns=self.columns, right_columns=other.columns)

right_select = [v.old_name for v in cross_join_input.right_select.renames
if (v.keep or v.join_key) and v.is_available]
left_select = [v.old_name for v in cross_join_input.left_select.renames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,67 @@ def __init__(self, polars_type: PlType):
self.__sql_type = None
self.__perc_unique = None

def __repr__(self):
"""
Provides a concise, developer-friendly representation of the object.
Ideal for debugging and console inspection.
"""
return (f"FlowfileColumn(name='{self.column_name}', "
f"type={self.data_type}, "
f"size={self.size}, "
f"nulls={self.number_of_empty_values})")

def __str__(self):
"""
Provides a detailed, readable summary of the column's metadata.
It conditionally omits any attribute that is None, ensuring a clean output.
"""
# --- Header (Always Shown) ---
header = f"<FlowfileColumn: '{self.column_name}'>"
lines = []

# --- Core Attributes (Conditionally Shown) ---
if self.data_type is not None:
lines.append(f" Type: {self.data_type}")
if self.size is not None:
lines.append(f" Non-Nulls: {self.size}")

# Calculate and display nulls if possible
if self.size is not None and self.number_of_empty_values is not None:
total_entries = self.size + self.number_of_empty_values
if total_entries > 0:
null_perc = (self.number_of_empty_values / total_entries) * 100
null_info = f"{self.number_of_empty_values} ({null_perc:.1f}%)"
else:
null_info = "0 (0.0%)"
lines.append(f" Nulls: {null_info}")

if self.number_of_unique_values is not None:
lines.append(f" Unique: {self.number_of_unique_values}")

# --- Conditional Stats Section ---
stats = []
if self.min_value is not None:
stats.append(f" Min: {self.min_value}")
if self.max_value is not None:
stats.append(f" Max: {self.max_value}")
if self.average_value is not None:
stats.append(f" Mean: {self.average_value}")

if stats:
lines.append(" Stats:")
lines.extend(stats)

# --- Conditional Examples Section ---
if self.example_values:
example_str = str(self.example_values)
# Truncate long example strings for cleaner display
if len(example_str) > 70:
example_str = example_str[:67] + '...'
lines.append(f" Examples: {example_str}")

return f"{header}\n" + "\n".join(lines)

@classmethod
def create_from_polars_type(cls, polars_type: PlType, **kwargs) -> "FlowfileColumn":
for k, v in kwargs.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,49 @@
from flowfile_core.schemas.transform_schema import FuzzyMatchInput
from flowfile_core.schemas.transform_schema import FuzzyMatchInput, SelectInput, JoinInputs
from flowfile_core.flowfile.flow_data_engine.join import verify_join_select_integrity, verify_join_map_integrity
import polars as pl
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Tuple, List

if TYPE_CHECKING:
from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine


def _order_join_inputs_based_on_col_order(col_order: List[str], join_inputs: JoinInputs) -> None:
"""
Ensure that the select columns in the fuzzy match input match the order of the incoming columns.
This function modifies the join_inputs object in-place.

Returns:
None
"""
select_map = {select.new_name: select for select in join_inputs.renames}
ordered_renames = [select_map[col] for col in col_order if col in select_map]
join_inputs.renames = ordered_renames


def _ensure_all_columns_have_select(left: "FlowDataEngine",
right: "FlowDataEngine",
fuzzy_match_input: FuzzyMatchInput):
"""
Ensure that all columns in the left and right FlowDataEngines are included in the fuzzy match input's select
statements.
Args:
left (FlowDataEngine):
right (FlowDataEngine):
fuzzy_match_input ():

Returns:
None
"""
right_cols_in_select = {c.old_name for c in fuzzy_match_input.right_select.renames}
left_cols_in_select = {c.old_name for c in fuzzy_match_input.left_select.renames}

fuzzy_match_input.left_select.renames.extend(
[SelectInput(col) for col in left.columns if col not in left_cols_in_select])
fuzzy_match_input.right_select.renames.extend(
[SelectInput(col) for col in right.columns if col not in right_cols_in_select]
)


def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine",
fuzzy_match_input: FuzzyMatchInput) -> Tuple[pl.LazyFrame, pl.LazyFrame]:
"""
Expand All @@ -19,14 +56,18 @@ def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine",
Returns:
Tuple[pl.LazyFrame, pl.LazyFrame]: Prepared left and right lazy frames
"""

left.lazy = True
right.lazy = True
_ensure_all_columns_have_select(left, right, fuzzy_match_input)
_order_join_inputs_based_on_col_order(left.columns, fuzzy_match_input.left_select)
_order_join_inputs_based_on_col_order(right.columns, fuzzy_match_input.right_select)

verify_join_select_integrity(fuzzy_match_input, left_columns=left.columns, right_columns=right.columns)
if not verify_join_map_integrity(fuzzy_match_input, left_columns=left.schema, right_columns=right.schema):
raise Exception('Join is not valid by the data fields')
fuzzy_match_input = fuzzy_match_input
fuzzy_match_input.auto_rename()

right_select = [v.old_name for v in fuzzy_match_input.right_select.renames if
(v.keep or v.join_key) and v.is_available]
left_select = [v.old_name for v in fuzzy_match_input.left_select.renames if
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Any, Optional, Literal
from pydantic import BaseModel
from flowfile_core.schemas.transform_schema import FuzzyMap
from pl_fuzzy_frame_match.models import FuzzyMapping

OperationType = Literal['store', 'calculate_schema', 'calculate_number_of_records', 'write_output', 'store_sample']

Expand All @@ -20,8 +20,8 @@ class FuzzyJoinInput(BaseModel):
cache_dir: Optional[str] = None
left_df_operation: PolarsOperation
right_df_operation: PolarsOperation
fuzzy_maps: list[FuzzyMap]
flowfile_node_id: int|str
fuzzy_maps: list[FuzzyMapping]
flowfile_node_id: int | str
flowfile_flow_id: int


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@
import polars as pl
import requests

from pl_fuzzy_frame_match.models import FuzzyMapping

from flowfile_core.configs import logger
from flowfile_core.configs.settings import WORKER_URL
from flowfile_core.flowfile.flow_data_engine.subprocess_operations.models import (
FuzzyJoinInput,
FuzzyMap,
OperationType,
PolarsOperation,
Status
Expand Down Expand Up @@ -53,7 +54,7 @@ def trigger_sample_operation(lf: pl.LazyFrame, file_ref: str, flow_id: int, node


def trigger_fuzzy_match_operation(left_df: pl.LazyFrame, right_df: pl.LazyFrame,
fuzzy_maps: List[FuzzyMap],
fuzzy_maps: List[FuzzyMapping],
file_ref: str,
flow_id: int,
node_id: int | str) -> Status:
Expand Down
Loading
Loading