diff --git a/flowfile/flowfile/__init__.py b/flowfile/flowfile/__init__.py index de7db989..595f43bd 100644 --- a/flowfile/flowfile/__init__.py +++ b/flowfile/flowfile/__init__.py @@ -36,7 +36,8 @@ get_all_available_cloud_storage_connections, create_cloud_storage_connection, del_cloud_storage_connection, - create_cloud_storage_connection_if_not_exists) + create_cloud_storage_connection_if_not_exists, + FuzzyMapping) from flowfile_frame.expr import ( col, lit, column, cum_count, len, sum, min, max, mean, count, when @@ -71,7 +72,7 @@ 'scan_csv_from_cloud_storage', 'get_all_available_cloud_storage_connections', 'create_cloud_storage_connection', 'del_cloud_storage_connection', 'create_cloud_storage_connection_if_not_exists', 'FlowGraph', 'FlowDataEngine', 'node_interface', 'FlowSettings', 'transform_schema', - 'FlowNode', 'FlowfileColumn', 'FlowInformation', + 'FlowNode', 'FlowfileColumn', 'FlowInformation', "FuzzyMapping", # Expression API 'col', 'lit', 'column', 'cum_count', 'len', diff --git a/flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py b/flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py index c6a5909d..e9cd6847 100644 --- a/flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py +++ b/flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py @@ -1,6 +1,8 @@ from typing import List, Dict, Optional, Set, Tuple import polars as pl +from pl_fuzzy_frame_match.models import FuzzyMapping + from flowfile_core.flowfile.flow_graph import FlowGraph from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, convert_pl_type_to_string from flowfile_core.flowfile.flow_data_engine.flow_file_column.utils import cast_str_to_polars_type @@ -825,6 +827,40 @@ def _handle_sample(self, settings: input_schema.NodeSample, var_name: str, input self._add_code(f"{var_name} = {input_df}.head(n={settings.sample_size})") self._add_code("") + @staticmethod + def _transform_fuzzy_mappings_to_string(fuzzy_mappings: List[FuzzyMapping]) -> str: + output_str = "[" + for i, fuzzy_mapping in enumerate(fuzzy_mappings): + + output_str += (f"FuzzyMapping(left_col='{fuzzy_mapping.left_col}'," + f" right_col='{fuzzy_mapping.right_col}', " + f"threshold_score={fuzzy_mapping.threshold_score}, " + f"fuzzy_type='{fuzzy_mapping.fuzzy_type}')") + if i < len(fuzzy_mappings) - 1: + output_str += ",\n" + output_str += "]" + return output_str + + def _handle_fuzzy_match(self, settings: input_schema.NodeFuzzyMatch, var_name: str, input_vars: Dict[str, str]) -> None: + """Handle fuzzy match nodes.""" + self.imports.add("from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs") + left_df = input_vars.get('main', input_vars.get('main_0', 'df_left')) + right_df = input_vars.get('right', input_vars.get('main_1', 'df_right')) + if left_df == right_df: + right_df = "df_right" + self._add_code(f"{right_df} = {left_df}") + + if settings.join_input.left_select.has_drop_cols(): + self._add_code(f"{left_df} = {left_df}.drop({[c.old_name for c in settings.join_input.left_select.non_jk_drop_columns]})") + if settings.join_input.right_select.has_drop_cols(): + self._add_code(f"{right_df} = {right_df}.drop({[c.old_name for c in settings.join_input.right_select.non_jk_drop_columns]})") + + fuzzy_join_mapping_settings = self._transform_fuzzy_mappings_to_string(settings.join_input.join_mapping) + self._add_code(f"{var_name} = fuzzy_match_dfs(\n" + f" left_df={left_df}, right_df={right_df},\n" + f" fuzzy_maps={fuzzy_join_mapping_settings}\n" + f" ).lazy()") + def _handle_unique(self, settings: input_schema.NodeUnique, var_name: str, input_vars: Dict[str, str]) -> None: """Handle unique/distinct nodes.""" input_df = input_vars.get('main', 'df') diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py index 806dbada..169be75d 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py @@ -6,6 +6,8 @@ from math import ceil from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, TypeVar, Literal, Generator +from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs + # Third-party imports from loky import Future import polars as pl @@ -1650,8 +1652,7 @@ def start_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput, An `ExternalFuzzyMatchFetcher` object that can be used to track the progress and retrieve the result of the fuzzy join. """ - left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, - fuzzy_match_input=fuzzy_match_input) + left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input) return ExternalFuzzyMatchFetcher(left_df, right_df, fuzzy_maps=fuzzy_match_input.fuzzy_maps, file_ref=file_ref + '_fm', @@ -1659,59 +1660,33 @@ def start_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput, flow_id=flow_id, node_id=node_id) - def do_fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput, - other: "FlowDataEngine", file_ref: str, flow_id: int = -1, - node_id: int | str = -1) -> "FlowDataEngine": - """Performs a fuzzy join with another DataFrame. - - This method blocks until the fuzzy join operation is complete. - - Args: - fuzzy_match_input: A `FuzzyMatchInput` object with the matching parameters. - other: The right `FlowDataEngine` to join with. - file_ref: A reference string for temporary files. - flow_id: The flow ID for tracking. - node_id: The node ID for tracking. - - Returns: - A new `FlowDataEngine` instance with the result of the fuzzy join. - """ - left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, - fuzzy_match_input=fuzzy_match_input) - f = ExternalFuzzyMatchFetcher(left_df, right_df, - fuzzy_maps=fuzzy_match_input.fuzzy_maps, - file_ref=file_ref + '_fm', - wait_on_completion=True, - flow_id=flow_id, - node_id=node_id) - return FlowDataEngine(f.get_result()) - - def fuzzy_match(self, right: "FlowDataEngine", left_on: str, right_on: str, - fuzzy_method: str = 'levenshtein', threshold: float = 0.75) -> "FlowDataEngine": - """Performs a simple fuzzy match between two DataFrames on a single column pair. - - This is a convenience method for a common fuzzy join scenario. - - Args: - right: The right `FlowDataEngine` to match against. - left_on: The column name from the left DataFrame to match on. - right_on: The column name from the right DataFrame to match on. - fuzzy_method: The fuzzy matching algorithm to use (e.g., 'levenshtein'). - threshold: The similarity score threshold (0.0 to 1.0) for a match. - - Returns: - A new `FlowDataEngine` with the matched data. - """ - fuzzy_match_input = transform_schemas.FuzzyMatchInput( - [transform_schemas.FuzzyMap( - left_on, right_on, - fuzzy_type=fuzzy_method, - threshold_score=threshold - )], - left_select=self.columns, - right_select=right.columns - ) - return self.do_fuzzy_join(fuzzy_match_input, right, str(id(self))) + def fuzzy_join_external(self, + fuzzy_match_input: transform_schemas.FuzzyMatchInput, + other: "FlowDataEngine", + file_ref: str = None, + flow_id: int = -1, + node_id: int = -1 + ): + if file_ref is None: + file_ref = str(id(self)) + '_' + str(id(other)) + + left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input) + external_tracker = ExternalFuzzyMatchFetcher(left_df, right_df, + fuzzy_maps=fuzzy_match_input.fuzzy_maps, + file_ref=file_ref + '_fm', + wait_on_completion=False, + flow_id=flow_id, + node_id=node_id) + return FlowDataEngine(external_tracker.get_result()) + + def fuzzy_join(self, fuzzy_match_input: transform_schemas.FuzzyMatchInput, + other: "FlowDataEngine", + node_logger: NodeLogger = None) -> "FlowDataEngine": + left_df, right_df = prepare_for_fuzzy_match(left=self, right=other, fuzzy_match_input=fuzzy_match_input) + fuzzy_mappings = [FuzzyMapping(**fm.__dict__) for fm in fuzzy_match_input.fuzzy_maps] + return FlowDataEngine(fuzzy_match_dfs(left_df, right_df, fuzzy_maps=fuzzy_mappings, + logger=node_logger.logger if node_logger else logger) + .lazy()) def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput, auto_generate_selection: bool, verify_integrity: bool, @@ -1733,11 +1708,12 @@ def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput, Exception: If `verify_integrity` is True and the join would result in an excessively large number of records. """ + self.lazy = True + other.lazy = True verify_join_select_integrity(cross_join_input, left_columns=self.columns, right_columns=other.columns) - right_select = [v.old_name for v in cross_join_input.right_select.renames if (v.keep or v.join_key) and v.is_available] left_select = [v.old_name for v in cross_join_input.left_select.renames diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_file_column/main.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_file_column/main.py index 4a450905..15230d4d 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_file_column/main.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_file_column/main.py @@ -76,6 +76,67 @@ def __init__(self, polars_type: PlType): self.__sql_type = None self.__perc_unique = None + def __repr__(self): + """ + Provides a concise, developer-friendly representation of the object. + Ideal for debugging and console inspection. + """ + return (f"FlowfileColumn(name='{self.column_name}', " + f"type={self.data_type}, " + f"size={self.size}, " + f"nulls={self.number_of_empty_values})") + + def __str__(self): + """ + Provides a detailed, readable summary of the column's metadata. + It conditionally omits any attribute that is None, ensuring a clean output. + """ + # --- Header (Always Shown) --- + header = f"" + lines = [] + + # --- Core Attributes (Conditionally Shown) --- + if self.data_type is not None: + lines.append(f" Type: {self.data_type}") + if self.size is not None: + lines.append(f" Non-Nulls: {self.size}") + + # Calculate and display nulls if possible + if self.size is not None and self.number_of_empty_values is not None: + total_entries = self.size + self.number_of_empty_values + if total_entries > 0: + null_perc = (self.number_of_empty_values / total_entries) * 100 + null_info = f"{self.number_of_empty_values} ({null_perc:.1f}%)" + else: + null_info = "0 (0.0%)" + lines.append(f" Nulls: {null_info}") + + if self.number_of_unique_values is not None: + lines.append(f" Unique: {self.number_of_unique_values}") + + # --- Conditional Stats Section --- + stats = [] + if self.min_value is not None: + stats.append(f" Min: {self.min_value}") + if self.max_value is not None: + stats.append(f" Max: {self.max_value}") + if self.average_value is not None: + stats.append(f" Mean: {self.average_value}") + + if stats: + lines.append(" Stats:") + lines.extend(stats) + + # --- Conditional Examples Section --- + if self.example_values: + example_str = str(self.example_values) + # Truncate long example strings for cleaner display + if len(example_str) > 70: + example_str = example_str[:67] + '...' + lines.append(f" Examples: {example_str}") + + return f"{header}\n" + "\n".join(lines) + @classmethod def create_from_polars_type(cls, polars_type: PlType, **kwargs) -> "FlowfileColumn": for k, v in kwargs.items(): diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/prepare_for_fuzzy_match.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/prepare_for_fuzzy_match.py index 02070e21..701d2375 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/prepare_for_fuzzy_match.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/fuzzy_matching/prepare_for_fuzzy_match.py @@ -1,12 +1,49 @@ -from flowfile_core.schemas.transform_schema import FuzzyMatchInput +from flowfile_core.schemas.transform_schema import FuzzyMatchInput, SelectInput, JoinInputs from flowfile_core.flowfile.flow_data_engine.join import verify_join_select_integrity, verify_join_map_integrity import polars as pl -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Tuple, List if TYPE_CHECKING: from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine +def _order_join_inputs_based_on_col_order(col_order: List[str], join_inputs: JoinInputs) -> None: + """ + Ensure that the select columns in the fuzzy match input match the order of the incoming columns. + This function modifies the join_inputs object in-place. + + Returns: + None + """ + select_map = {select.new_name: select for select in join_inputs.renames} + ordered_renames = [select_map[col] for col in col_order if col in select_map] + join_inputs.renames = ordered_renames + + +def _ensure_all_columns_have_select(left: "FlowDataEngine", + right: "FlowDataEngine", + fuzzy_match_input: FuzzyMatchInput): + """ + Ensure that all columns in the left and right FlowDataEngines are included in the fuzzy match input's select + statements. + Args: + left (FlowDataEngine): + right (FlowDataEngine): + fuzzy_match_input (): + + Returns: + None + """ + right_cols_in_select = {c.old_name for c in fuzzy_match_input.right_select.renames} + left_cols_in_select = {c.old_name for c in fuzzy_match_input.left_select.renames} + + fuzzy_match_input.left_select.renames.extend( + [SelectInput(col) for col in left.columns if col not in left_cols_in_select]) + fuzzy_match_input.right_select.renames.extend( + [SelectInput(col) for col in right.columns if col not in right_cols_in_select] + ) + + def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine", fuzzy_match_input: FuzzyMatchInput) -> Tuple[pl.LazyFrame, pl.LazyFrame]: """ @@ -19,14 +56,18 @@ def prepare_for_fuzzy_match(left: "FlowDataEngine", right: "FlowDataEngine", Returns: Tuple[pl.LazyFrame, pl.LazyFrame]: Prepared left and right lazy frames """ - left.lazy = True right.lazy = True + _ensure_all_columns_have_select(left, right, fuzzy_match_input) + _order_join_inputs_based_on_col_order(left.columns, fuzzy_match_input.left_select) + _order_join_inputs_based_on_col_order(right.columns, fuzzy_match_input.right_select) + verify_join_select_integrity(fuzzy_match_input, left_columns=left.columns, right_columns=right.columns) if not verify_join_map_integrity(fuzzy_match_input, left_columns=left.schema, right_columns=right.schema): raise Exception('Join is not valid by the data fields') fuzzy_match_input = fuzzy_match_input fuzzy_match_input.auto_rename() + right_select = [v.old_name for v in fuzzy_match_input.right_select.renames if (v.keep or v.join_key) and v.is_available] left_select = [v.old_name for v in fuzzy_match_input.left_select.renames if diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/models.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/models.py index a47e97f3..dc827dcb 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/models.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/models.py @@ -1,6 +1,6 @@ from typing import Any, Optional, Literal from pydantic import BaseModel -from flowfile_core.schemas.transform_schema import FuzzyMap +from pl_fuzzy_frame_match.models import FuzzyMapping OperationType = Literal['store', 'calculate_schema', 'calculate_number_of_records', 'write_output', 'store_sample'] @@ -20,8 +20,8 @@ class FuzzyJoinInput(BaseModel): cache_dir: Optional[str] = None left_df_operation: PolarsOperation right_df_operation: PolarsOperation - fuzzy_maps: list[FuzzyMap] - flowfile_node_id: int|str + fuzzy_maps: list[FuzzyMapping] + flowfile_node_id: int | str flowfile_flow_id: int diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py index adad4323..7f1b5dc6 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py @@ -9,11 +9,12 @@ import polars as pl import requests +from pl_fuzzy_frame_match.models import FuzzyMapping + from flowfile_core.configs import logger from flowfile_core.configs.settings import WORKER_URL from flowfile_core.flowfile.flow_data_engine.subprocess_operations.models import ( FuzzyJoinInput, - FuzzyMap, OperationType, PolarsOperation, Status @@ -53,7 +54,7 @@ def trigger_sample_operation(lf: pl.LazyFrame, file_ref: str, flow_id: int, node def trigger_fuzzy_match_operation(left_df: pl.LazyFrame, right_df: pl.LazyFrame, - fuzzy_maps: List[FuzzyMap], + fuzzy_maps: List[FuzzyMapping], file_ref: str, flow_id: int, node_id: int | str) -> Status: diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 8883e467..38b2f2e1 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -803,26 +803,34 @@ def add_fuzzy_match(self, fuzzy_settings: input_schema.NodeFuzzyMatch) -> "FlowG """ def _func(main: FlowDataEngine, right: FlowDataEngine) -> FlowDataEngine: + node = self.get_node(node_id=fuzzy_settings.node_id) + if self.execution_location == "local": + return main.fuzzy_join(fuzzy_match_input=fuzzy_settings.join_input, + other=right, + node_logger=self.flow_logger.get_node_logger(fuzzy_settings.node_id)) + f = main.start_fuzzy_join(fuzzy_match_input=fuzzy_settings.join_input, other=right, file_ref=node.hash, flow_id=self.flow_id, node_id=fuzzy_settings.node_id) logger.info("Started the fuzzy match action") - node._fetch_cached_df = f + node._fetch_cached_df = f # Add to the node so it can be cancelled and fetch later if needed return FlowDataEngine(f.get_result()) - self.add_node_step(node_id=fuzzy_settings.node_id, - function=_func, - input_columns=[], - node_type='fuzzy_match', - setting_input=fuzzy_settings) - node = self.get_node(node_id=fuzzy_settings.node_id) - def schema_callback(): - return calculate_fuzzy_match_schema(fuzzy_settings.join_input, + fm_input_copy = deepcopy(fuzzy_settings.join_input) # Deepcopy create an unique object per func + node = self.get_node(node_id=fuzzy_settings.node_id) + return calculate_fuzzy_match_schema(fm_input_copy, left_schema=node.node_inputs.main_inputs[0].schema, right_schema=node.node_inputs.right_input.schema ) - node.schema_callback = schema_callback + self.add_node_step(node_id=fuzzy_settings.node_id, + function=_func, + input_columns=[], + node_type='fuzzy_match', + setting_input=fuzzy_settings, + input_node_ids=fuzzy_settings.depending_on_ids, + schema_callback=schema_callback) + return self def add_text_to_rows(self, node_text_to_rows: input_schema.NodeTextToRows) -> "FlowGraph": diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py index 52c51e4f..dcf61cab 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py @@ -959,16 +959,16 @@ def reset(self, deep: bool = False): logger.info(f'{self.node_id}: Node needs reset') self.node_stats.has_run_with_current_setup = False self.results.reset() - if self.is_correct: - self._schema_callback = None # Ensure the schema callback is reset - if self.schema_callback: - logger.info(f'{self.node_id}: Resetting the schema callback') - self.schema_callback.start() self.node_schema.result_schema = None self.node_schema.predicted_schema = None self._hash = None self.node_information.is_setup = None self.results.errors = None + if self.is_correct: + self._schema_callback = None # Ensure the schema callback is reset + if self.schema_callback: + logger.info(f'{self.node_id}: Resetting the schema callback') + self.schema_callback.start() self.evaluate_nodes() _ = self.hash # Recalculate the hash after reset diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/schema_callback.py b/flowfile_core/flowfile_core/flowfile/flow_node/schema_callback.py index 74d7db5a..3f9f2f0e 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/schema_callback.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/schema_callback.py @@ -1,71 +1,166 @@ - from typing import Callable, Any, Optional, Generic, TypeVar from concurrent.futures import ThreadPoolExecutor, Future +import threading from flowfile_core.configs import logger - T = TypeVar('T') class SingleExecutionFuture(Generic[T]): - """Single execution of a function in a separate thread with caching of the result.""" - executor: ThreadPoolExecutor - future: Optional[Future[T]] + """Thread-safe single execution of a function with result caching. + + Ensures a function is executed at most once even when called from multiple threads. + Subsequent calls return the cached result. + """ + func: Callable[[], T] on_error: Optional[Callable[[Exception], Any]] - result_value: Optional[T] - has_run_at_least_once: bool = False # Indicates if the function has been run at least once + _lock: threading.RLock + _executor: Optional[ThreadPoolExecutor] + _future: Optional[Future[T]] + _result_value: Optional[T] + _exception: Optional[Exception] + _has_completed: bool + _has_started: bool def __init__( - self, - func: Callable[[], T], - on_error: Optional[Callable[[Exception], Any]] = None + self, + func: Callable[[], T], + on_error: Optional[Callable[[Exception], Any]] = None ) -> None: """Initialize with function and optional error handler.""" - self.executor = ThreadPoolExecutor(max_workers=1) - self.future = None self.func = func self.on_error = on_error - self.result_value = None - self.has_run_at_least_once = False + + # Thread safety + self._lock = threading.RLock() # RLock allows re-entrant locking + + # Execution state + self._executor = None + self._future = None + self._result_value = None + self._exception = None + self._has_completed = False + self._has_started = False + + def _ensure_executor(self) -> ThreadPoolExecutor: + """Ensure executor exists, creating if necessary.""" + if self._executor is None or self._executor._shutdown: + self._executor = ThreadPoolExecutor(max_workers=1) + return self._executor def start(self) -> None: """Start the function execution if not already started.""" - if not self.future: - logger.info("single executor function started") - self.future = self.executor.submit(self.func) + with self._lock: + if self._has_started: + logger.info("Function already started or completed") + return + + logger.info("Starting single executor function") + executor: ThreadPoolExecutor = self._ensure_executor() + self._future = executor.submit(self._func_wrapper) + self._has_started = True + + def _func_wrapper(self) -> T: + """Wrapper to capture the result or exception.""" + try: + result: T = self.func() + with self._lock: + self._result_value = result + self._has_completed = True + return result + except Exception as e: + with self._lock: + self._exception = e + self._has_completed = True + raise def cleanup(self) -> None: - """Clean up resources by clearing the future and shutting down the executor.""" - self.has_run_at_least_once = True - self.executor.shutdown(wait=False) + """Clean up resources by shutting down the executor.""" + with self._lock: + if self._executor and not self._executor._shutdown: + self._executor.shutdown(wait=False) def __call__(self) -> Optional[T]: """Execute function if not running and return its result.""" - if self.result_value: - return self.result_value - if not self.future: - self.start() - else: - logger.info("Function already running or did complete") - try: - self.result_value = self.future.result() - logger.info("Done with the function") - return self.result_value - except Exception as e: - if self.on_error: - return self.on_error(e) - else: - raise e - finally: - self.cleanup() + with self._lock: + # If already completed, return cached result or raise cached exception + if self._has_completed: + if self._exception: + if self.on_error: + return self.on_error(self._exception) + else: + raise self._exception + return self._result_value + + # Start if not already started + if not self._has_started: + self.start() + + # Wait for completion outside the lock to avoid blocking other threads + if self._future: + try: + result: T = self._future.result() + logger.info("Function completed successfully") + return result + except Exception as e: + logger.error(f"Function raised exception: {e}") + if self.on_error: + return self.on_error(e) + else: + raise + + return None + + def reset(self) -> None: + """Reset the execution state, allowing the function to be run again.""" + with self._lock: + logger.info("Resetting single execution future") + + # Cancel any pending execution + if self._future and not self._future.done(): + self._future.cancel() - def reset(self): - """Reset the future and result value.""" - logger.info("Resetting the future and result value") - self.result_value = None - self.future = None + # Clean up old executor + if self._executor and not self._executor._shutdown: + self._executor.shutdown(wait=False) + + # Reset state + self._executor = None + self._future = None + self._result_value = None + self._exception = None + self._has_completed = False + self._has_started = False + + def is_running(self) -> bool: + """Check if the function is currently executing.""" + with self._lock: + return bool( + self._has_started and + not self._has_completed and + self._future is not None and + not self._future.done() + ) + + def is_completed(self) -> bool: + """Check if the function has completed execution.""" + with self._lock: + return self._has_completed + + def get_result(self) -> Optional[T]: + """Get the cached result without triggering execution.""" + with self._lock: + if self._exception: + if self.on_error: + return self.on_error(self._exception) + else: + raise self._exception + return self._result_value def __del__(self) -> None: """Ensure executor is shut down on deletion.""" - self.cleanup() + try: + self.cleanup() + except Exception: + pass diff --git a/flowfile_core/flowfile_core/flowfile/schema_callbacks.py b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py index 32baaa8a..69e58e4a 100644 --- a/flowfile_core/flowfile_core/flowfile/schema_callbacks.py +++ b/flowfile_core/flowfile_core/flowfile/schema_callbacks.py @@ -1,25 +1,72 @@ from typing import List -from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, PlType -from flowfile_core.schemas import transform_schema -from flowfile_core.schemas import input_schema + from polars import datatypes import polars as pl + +from pl_fuzzy_frame_match.output_column_name_utils import set_name_in_fuzzy_mappings +from pl_fuzzy_frame_match.pre_process import rename_fuzzy_right_mapping + from flowfile_core.flowfile.flow_data_engine.subprocess_operations.subprocess_operations import fetch_unique_values from flowfile_core.configs.flow_logger import main_logger +from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn, PlType +from flowfile_core.schemas import transform_schema +from flowfile_core.schemas import input_schema + + +def _ensure_all_columns_have_select(left_cols: List[str], + right_cols: List[str], + fuzzy_match_input: transform_schema.FuzzyMatchInput): + """ + Ensure that all columns in the left and right FlowDataEngines are included in the fuzzy match input's select + statements. + Args: + left_cols (List[str]): List of column names in the left FlowDataEngine. + right_cols (List[str]): List of column names in the right FlowDataEngine. + fuzzy_match_input (FuzzyMatchInput): Fuzzy match input configuration containing select statements. + + Returns: + None + """ + right_cols_in_select = {c.old_name for c in fuzzy_match_input.right_select.renames} + left_cols_in_select = {c.old_name for c in fuzzy_match_input.left_select.renames} + fuzzy_match_input.left_select.renames.extend( + [transform_schema.SelectInput(col) for col in left_cols if col not in left_cols_in_select]) + fuzzy_match_input.right_select.renames.extend( + [transform_schema.SelectInput(col) for col in right_cols if col not in right_cols_in_select] + ) -def calculate_uniqueness(a: float, b: float) -> float: - return ((pow(a + 0.5, 2) + pow(b + 0.5, 2)) / 2 - pow(0.5, 2)) + 0.5 * abs(a - b) + +def _order_join_inputs_based_on_col_order(col_order: List[str], join_inputs: transform_schema.JoinInputs) -> None: + """ + Ensure that the select columns in the fuzzy match input match the order of the incoming columns. + This function modifies the join_inputs object in-place. + + Returns: + None + """ + select_map = {select.new_name: select for select in join_inputs.renames} + ordered_renames = [select_map[col] for col in col_order if col in select_map] + join_inputs.renames = ordered_renames def calculate_fuzzy_match_schema(fm_input: transform_schema.FuzzyMatchInput, left_schema: List[FlowfileColumn], right_schema: List[FlowfileColumn]): - print('calculating fuzzy match schema') + _ensure_all_columns_have_select(left_cols=[col.column_name for col in left_schema], + right_cols=[col.column_name for col in right_schema], + fuzzy_match_input=fm_input) + _order_join_inputs_based_on_col_order(col_order=[col.column_name for col in left_schema], + join_inputs=fm_input.left_select) + _order_join_inputs_based_on_col_order(col_order=[col.column_name for col in right_schema], + join_inputs=fm_input.right_select) left_schema_dict, right_schema_dict = ({ls.name: ls for ls in left_schema}, {rs.name: rs for rs in right_schema}) fm_input.auto_rename() + right_renames = {column.old_name: column.new_name for column in fm_input.right_select.renames} + new_join_mapping = rename_fuzzy_right_mapping(fm_input.join_mapping, right_renames) + output_schema = [] for column in fm_input.left_select.renames: column_schema = left_schema_dict.get(column.old_name) @@ -31,9 +78,9 @@ def calculate_fuzzy_match_schema(fm_input: transform_schema.FuzzyMatchInput, if column_schema and column.keep: output_schema.append(FlowfileColumn.from_input(column.new_name, column_schema.data_type, example_values=column_schema.example_values)) - - for i, fm in enumerate(fm_input.join_mapping): - output_schema.append(FlowfileColumn.from_input(f'fuzzy_score_{i}', 'Float64')) + set_name_in_fuzzy_mappings(new_join_mapping) + output_schema.extend([FlowfileColumn.from_input(fuzzy_mapping.output_column_name, 'Float64') + for fuzzy_mapping in new_join_mapping]) return output_schema diff --git a/flowfile_core/flowfile_core/flowfile/setting_generator/settings.py b/flowfile_core/flowfile_core/flowfile/setting_generator/settings.py index 91fb4a26..5d14dba0 100644 --- a/flowfile_core/flowfile_core/flowfile/setting_generator/settings.py +++ b/flowfile_core/flowfile_core/flowfile/setting_generator/settings.py @@ -4,6 +4,7 @@ from functools import wraps from flowfile_core.schemas.output_model import NodeData from flowfile_core.flowfile.setting_generator.setting_generator import SettingGenerator, SettingUpdator +from pl_fuzzy_frame_match.models import FuzzyMapping setting_generator = SettingGenerator() setting_updator = SettingUpdator() @@ -135,7 +136,7 @@ def cross_join(node_data: NodeData): def check_if_fuzzy_match_is_valid(left_columns: Iterable[str], right_columns: Iterable[str], - fuzzy_map: transform_schema.FuzzyMap) -> bool: + fuzzy_map: FuzzyMapping) -> bool: if fuzzy_map.left_col not in left_columns: return False if fuzzy_map.right_col not in right_columns: diff --git a/flowfile_core/flowfile_core/schemas/transform_schema.py b/flowfile_core/flowfile_core/schemas/transform_schema.py index fef48b92..8005ad4f 100644 --- a/flowfile_core/flowfile_core/schemas/transform_schema.py +++ b/flowfile_core/flowfile_core/schemas/transform_schema.py @@ -6,6 +6,8 @@ from typing import NamedTuple +from pl_fuzzy_frame_match.models import FuzzyMapping + def get_func_type_mapping(func: str): """Infers the output data type of common aggregation functions.""" @@ -158,6 +160,19 @@ def get_select_cols(self, include_join_key: bool = True): """Gets a list of original column names to select from the source DataFrame.""" return [v.old_name for v in self.renames if v.keep or (v.join_key and include_join_key)] + def has_drop_cols(self) -> bool: + """Checks if any column is marked to be dropped from the selection.""" + return any(not v.keep for v in self.renames) + + @property + def drop_columns(self) -> List[SelectInput]: + """Returns a list of column names that are marked to be dropped from the selection.""" + return [v for v in self.renames if not v.keep and v.is_available] + + @property + def non_jk_drop_columns(self) -> List[SelectInput]: + return [v for v in self.renames if not v.keep and v.is_available and not v.join_key] + def __add__(self, other: "SelectInput"): """Allows adding a SelectInput using the '+' operator.""" self.renames.append(other) @@ -225,32 +240,6 @@ class JoinMap: right_col: str -@dataclass -class FuzzyMap(JoinMap): - """Extends `JoinMap` with settings for fuzzy string matching, such as the algorithm and similarity threshold.""" - threshold_score: Optional[float] = 80.0 - fuzzy_type: Optional[FuzzyTypeLiteral] = 'levenshtein' - perc_unique: Optional[float] = 0.0 - output_column_name: Optional[str] = None - valid: Optional[bool] = True - - def __init__(self, left_col: str, right_col: str = None, threshold_score: float = 80.0, - fuzzy_type: FuzzyTypeLiteral = 'levenshtein', perc_unique: float = 0, output_column_name: str = None, - _output_col_name: str = None, valid: bool = True, output_col_name: str = None): - if right_col is None: - right_col = left_col - self.valid = valid - self.left_col = left_col - self.right_col = right_col - self.threshold_score = threshold_score - self.fuzzy_type = fuzzy_type - self.perc_unique = perc_unique - self.output_column_name = output_column_name if output_column_name is not None else _output_col_name - self.output_column_name = self.output_column_name if self.output_column_name is not None else output_col_name - if self.output_column_name is None: - self.output_column_name = f'fuzzy_score_{self.left_col}_{self.right_col}' - - class JoinSelectMixin: """A mixin providing common methods for join-like operations that involve left and right inputs.""" left_select: JoinInputs = None @@ -430,32 +419,32 @@ def used_join_mapping(self) -> List[JoinMap]: @dataclass class FuzzyMatchInput(JoinInput): """Extends `JoinInput` with settings specific to fuzzy matching, such as the matching algorithm and threshold.""" - join_mapping: List[FuzzyMap] + join_mapping: List[FuzzyMapping] aggregate_output: bool = False @staticmethod - def parse_fuzz_mapping(fuzz_mapping: List[FuzzyMap] | Tuple[str, str] | str) -> List[FuzzyMap]: + def parse_fuzz_mapping(fuzz_mapping: List[FuzzyMapping] | Tuple[str, str] | str) -> List[FuzzyMapping]: if isinstance(fuzz_mapping, (tuple, list)): assert len(fuzz_mapping) > 0 if all(isinstance(fm, dict) for fm in fuzz_mapping): - fuzz_mapping = [FuzzyMap(**fm) for fm in fuzz_mapping] + fuzz_mapping = [FuzzyMapping(**fm) for fm in fuzz_mapping] - if not isinstance(fuzz_mapping[0], FuzzyMap): + if not isinstance(fuzz_mapping[0], FuzzyMapping): assert len(fuzz_mapping) <= 2 if len(fuzz_mapping) == 2: assert isinstance(fuzz_mapping[0], str) and isinstance(fuzz_mapping[1], str) - fuzz_mapping = [FuzzyMap(*fuzz_mapping)] + fuzz_mapping = [FuzzyMapping(*fuzz_mapping)] elif isinstance(fuzz_mapping[0], str): - fuzz_mapping = [FuzzyMap(fuzz_mapping[0], fuzz_mapping[0])] + fuzz_mapping = [FuzzyMapping(fuzz_mapping[0], fuzz_mapping[0])] elif isinstance(fuzz_mapping, str): - fuzz_mapping = [FuzzyMap(fuzz_mapping, fuzz_mapping)] - elif isinstance(fuzz_mapping, FuzzyMap): + fuzz_mapping = [FuzzyMapping(fuzz_mapping, fuzz_mapping)] + elif isinstance(fuzz_mapping, FuzzyMapping): fuzz_mapping = [fuzz_mapping] else: raise Exception('No valid join mapping as input') return fuzz_mapping - def __init__(self, join_mapping: List[FuzzyMap] | Tuple[str, str] | str, left_select: List[SelectInput] | List[str], + def __init__(self, join_mapping: List[FuzzyMapping] | Tuple[str, str] | str, left_select: List[SelectInput] | List[str], right_select: List[SelectInput] | List[str], aggregate_output: bool = False, how: JoinStrategy = 'inner'): self.join_mapping = self.parse_fuzz_mapping(join_mapping) self.left_select = self.parse_select(left_select) @@ -463,9 +452,9 @@ def __init__(self, join_mapping: List[FuzzyMap] | Tuple[str, str] | str, left_se self.how = how for jm in self.join_mapping: - if jm.right_col not in self.right_select.old_cols: + if jm.right_col not in {v.old_name for v in self.right_select.renames}: self.right_select.append(SelectInput(jm.right_col, keep=False, join_key=True)) - if jm.left_col not in self.left_select.old_cols: + if jm.left_col not in {v.old_name for v in self.left_select.renames}: self.left_select.append(SelectInput(jm.left_col, keep=False, join_key=True)) [setattr(v, "join_key", v.old_name in self._left_join_keys) for v in self.left_select.renames] [setattr(v, "join_key", v.old_name in self._right_join_keys) for v in self.right_select.renames] @@ -476,7 +465,7 @@ def overlapping_records(self): return self.left_select.new_cols & self.right_select.new_cols @property - def fuzzy_maps(self) -> List[FuzzyMap]: + def fuzzy_maps(self) -> List[FuzzyMapping]: """Returns the final fuzzy mappings after applying all column renames.""" new_mappings = [] left_rename_table, right_rename_table = self.left_select.rename_table, self.right_select.rename_table diff --git a/flowfile_core/tests/flowfile/flowfile_table/fuzzy_mathcing/test_prepare_for_fuzzy_match.py b/flowfile_core/tests/flowfile/flowfile_table/fuzzy_mathcing/test_prepare_for_fuzzy_match.py index 2c315097..b8de10c3 100644 --- a/flowfile_core/tests/flowfile/flowfile_table/fuzzy_mathcing/test_prepare_for_fuzzy_match.py +++ b/flowfile_core/tests/flowfile/flowfile_table/fuzzy_mathcing/test_prepare_for_fuzzy_match.py @@ -1,3 +1,5 @@ +from pl_fuzzy_frame_match.models import FuzzyMapping + from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine from flowfile_core.flowfile.flow_data_engine.fuzzy_matching.prepare_for_fuzzy_match import prepare_for_fuzzy_match from flowfile_core.schemas import transform_schema @@ -12,7 +14,7 @@ def test_prepare_for_fuzzy_match(): left_select = [transform_schema.SelectInput(c) for c in left_flowfile_table.columns] right_select = [transform_schema.SelectInput(c) for c in right_flowfile_table.columns] - fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[transform_schema.FuzzyMap(left_col='name')], + fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[FuzzyMapping(left_col='name')], left_select=left_select, right_select=right_select ) diff --git a/flowfile_core/tests/flowfile/flowfile_table/test_flow_data_engine.py b/flowfile_core/tests/flowfile/flowfile_table/test_flow_data_engine.py index 0c6319fc..4d157c8f 100644 --- a/flowfile_core/tests/flowfile/flowfile_table/test_flow_data_engine.py +++ b/flowfile_core/tests/flowfile/flowfile_table/test_flow_data_engine.py @@ -1,10 +1,11 @@ from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine, execute_polars_code from flowfile_core.flowfile.flow_data_engine.polars_code_parser import remove_comments_and_docstrings from flowfile_core.schemas import transform_schema -from flowfile_core.schemas import cloud_storage_schemas as cs_schemas import polars as pl import pytest +from pl_fuzzy_frame_match.models import FuzzyMapping + def create_sample_data(): flowfile_table = FlowDataEngine.create_random(100) @@ -12,26 +13,111 @@ def create_sample_data(): return flowfile_table -def test_fuzzy_match(): +def test_fuzzy_match_internal(): + r = transform_schema.SelectInputs([transform_schema.SelectInput(old_name='column_0', new_name='name')]) + left_flowfile_table = FlowDataEngine(['edward', 'eduward', 'court']).do_select(r) + right_flowfile_table = left_flowfile_table + left_select = [transform_schema.SelectInput(c) for c in left_flowfile_table.columns] + right_select = [transform_schema.SelectInput(c) for c in right_flowfile_table.columns] + fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[FuzzyMapping(left_col='name')], + left_select=left_select, right_select=right_select + ) + fuzzy_match_result = left_flowfile_table.fuzzy_join(fuzzy_match_input, right_flowfile_table) + assert fuzzy_match_result is not None, 'Fuzzy match failed' + assert fuzzy_match_result.count() > 0, 'No fuzzy matches found' + expected_data = FlowDataEngine([{'name': 'court', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'court'}, + {'name': 'eduward', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'eduward'}, + {'name': 'edward', 'name_vs_name_right_levenshtein': 0.8571428571428572, 'name_right': 'eduward'}, + {'name': 'eduward', 'name_vs_name_right_levenshtein': 0.8571428571428572, 'name_right': 'edward'}, + {'name': 'edward', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'edward'}]) + fuzzy_match_result.assert_equal(expected_data) + + +@pytest.fixture +def fuzzy_test_data_left() -> FlowDataEngine: + """ + Generates a small, predictable test dataset with data designed for fuzzy matching challenges. + + Returns: + LazyFrame with left side test data + """ + return FlowDataEngine(pl.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "company_name": ["Apple Inc.", "Microsft", "Amazon", "Gogle", "Facebok"], + "address": ["1 Apple Park", "One Microsoft Way", "410 Terry Ave N", "1600 Amphitheatre", "1 Hacker Way"], + "contact": ["Tim Cook", "Satya Ndella", "Andy Jessy", "Sundar Pichai", "Mark Zukerberg"], + } + )) + + +@pytest.fixture +def fuzzy_test_data_right() -> FlowDataEngine: + """ + Generates a small, predictable test dataset with variations for fuzzy matching. + + Returns: + LazyFrame with right side test data + """ + return FlowDataEngine(pl.DataFrame( + { + "id": [101, 102, 103, 104, 105], + "organization": ["Apple Incorporated", "Microsoft Corp", "Amazon.com Inc", "Google LLC", "Facebook Inc"], + "location": [ + "Apple Park, Cupertino", + "Microsoft Way, Redmond", + "Terry Ave North, Seattle", + "Amphitheatre Pkwy, Mountain View", + "Hacker Way, Menlo Park", + ], + "ceo": ["Timothy Cook", "Satya Nadella", "Andy Jassy", "Sundar Pichai", "Mark Zuckerberg"], + } + )) + + +def test_fuzzy_match_auto_select_columns_not_provided(fuzzy_test_data_left, fuzzy_test_data_right): + left_select = [transform_schema.SelectInput(c) for c in fuzzy_test_data_left.columns[:-1]] + right_select = [transform_schema.SelectInput(c) for c in fuzzy_test_data_right.columns[:-1]] + fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[ + FuzzyMapping(left_col='company_name', right_col='organization', threshold_score=50) + ], left_select=left_select, right_select=right_select) + fuzzy_match_result = fuzzy_test_data_left.fuzzy_join(fuzzy_match_input, fuzzy_test_data_right) + assert fuzzy_match_result is not None, 'Fuzzy match failed' + assert fuzzy_match_result.number_of_fields == 9 + + +def test_fuzzy_match_auto_select_columns_not_selected(fuzzy_test_data_left, fuzzy_test_data_right): + left_select = [transform_schema.SelectInput(c, keep=False) for c in fuzzy_test_data_left.columns[:-1]] + right_select = [transform_schema.SelectInput(c, keep=False) for c in fuzzy_test_data_right.columns] + fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[ + FuzzyMapping(left_col='company_name', right_col='organization', threshold_score=50) + ], left_select=left_select, right_select=right_select) + fuzzy_match_result = fuzzy_test_data_left.fuzzy_join(fuzzy_match_input, fuzzy_test_data_right) + assert fuzzy_match_result is not None, 'Fuzzy match failed' + assert fuzzy_match_result.number_of_fields == 4 + + +def test_fuzzy_match_external(): r = transform_schema.SelectInputs([transform_schema.SelectInput(old_name='column_0', new_name='name')]) left_flowfile_table = FlowDataEngine(['edward', 'eduward', 'court']).do_select(r) right_flowfile_table = left_flowfile_table left_select = [transform_schema.SelectInput(c) for c in left_flowfile_table.columns] right_select = [transform_schema.SelectInput(c) for c in right_flowfile_table.columns] - fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[transform_schema.FuzzyMap(left_col='name')], + fuzzy_match_input = transform_schema.FuzzyMatchInput(join_mapping=[FuzzyMapping(left_col='name')], left_select=left_select, right_select=right_select ) - fuzzy_match_result = left_flowfile_table.do_fuzzy_join(fuzzy_match_input, right_flowfile_table, 'test') + fuzzy_match_result = left_flowfile_table.fuzzy_join_external(fuzzy_match_input, right_flowfile_table) assert fuzzy_match_result is not None, 'Fuzzy match failed' assert fuzzy_match_result.count() > 0, 'No fuzzy matches found' - expected_data = FlowDataEngine([{'name': 'court', 'fuzzy_score_0': 1.0, 'name_right': 'court'}, - {'name': 'eduward', 'fuzzy_score_0': 1.0, 'name_right': 'eduward'}, - {'name': 'edward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'eduward'}, - {'name': 'eduward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'edward'}, - {'name': 'edward', 'fuzzy_score_0': 1.0, 'name_right': 'edward'}]) + expected_data = FlowDataEngine([{'name': 'court', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'court'}, + {'name': 'eduward', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'eduward'}, + {'name': 'edward', 'name_vs_name_right_levenshtein': 0.8571428571428572, 'name_right': 'eduward'}, + {'name': 'eduward', 'name_vs_name_right_levenshtein': 0.8571428571428572, 'name_right': 'edward'}, + {'name': 'edward', 'name_vs_name_right_levenshtein': 1.0, 'name_right': 'edward'}]) fuzzy_match_result.assert_equal(expected_data) + def test_cross_join(): left_flowfile_table = FlowDataEngine.create_random(100) right_flowfile_table = FlowDataEngine.create_random(100) diff --git a/flowfile_core/tests/flowfile/test_code_generator.py b/flowfile_core/tests/flowfile/test_code_generator.py index b4274aa3..4c3191b9 100644 --- a/flowfile_core/tests/flowfile/test_code_generator.py +++ b/flowfile_core/tests/flowfile/test_code_generator.py @@ -4,6 +4,9 @@ from polars.testing import assert_frame_equal from pathlib import Path from uuid import uuid4 + +from pl_fuzzy_frame_match.models import FuzzyMapping + from flowfile_core.flowfile.flow_graph import FlowGraph, add_connection from flowfile_core.schemas import input_schema, transform_schema, schemas, cloud_storage_schemas as cloud_ss from flowfile_core.flowfile.code_generator.code_generator import export_flow_to_polars @@ -226,6 +229,39 @@ def join_input_dataset() -> tuple[input_schema.NodeManualInput, input_schema.Nod return left_data, right_data +@pytest.fixture +def fuzzy_join_left_data() -> input_schema.NodeManualInput: + return input_schema.NodeManualInput( + flow_id=1, + node_id=1, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="id", data_type="Integer"), + input_schema.MinimalFieldInfo(name="name", data_type="String"), + input_schema.MinimalFieldInfo(name="address", data_type="String"), + ], + data=[[1, 2, 3, 4, 5], ["Edward", "Eduward", "Edvard", "Charles", "Charlie"], + ["123 Main Str", "123 Main Street", "456 Elm Str", "789 Oak Str", "789 Oak Street"]] + ) + ) + + +@pytest.fixture +def fuzzy_join_right_data() -> input_schema.NodeManualInput: + return input_schema.NodeManualInput( + flow_id=1, + node_id=2, + raw_data_format=input_schema.RawData( + columns=[ + input_schema.MinimalFieldInfo(name="first_name", data_type="String"), + input_schema.MinimalFieldInfo(name="street", data_type="String"), + ], + data=[[1, 2, 3, 4, 5], ["Edward", "Eduward", "Edvard", "Charles", "Charlie"], + ["main street 123", "main street 123", "elm street 456", "oak street 789", "oak street 789"]] + ) + ) + + @pytest.fixture def join_input_large_dataset() -> tuple[input_schema.NodeManualInput, input_schema.NodeManualInput]: data_engine = FlowDataEngine.create_random(100) @@ -2691,3 +2727,52 @@ def test_cloud_storage_writer(file_format): cloud_ss.CloudStorageReadSettingsInternal(read_settings=read_settings, connection=get_cloud_connection()) ) assert fde.collect()[0, 0] == 5 + + +def test_fuzzy_match_single_file(fuzzy_join_left_data): + flow = create_basic_flow(1) + flow.add_manual_input(fuzzy_join_left_data) + settings = input_schema.NodeFuzzyMatch(flow_id=1, node_id=2, description='', auto_generate_selection=True, + join_input=transform_schema.FuzzyMatchInput( + join_mapping=[FuzzyMapping('name',threshold_score=75.0)], + left_select=[transform_schema.SelectInput(old_name='id', keep=True), + transform_schema.SelectInput(old_name='name', keep=True), + transform_schema.SelectInput(old_name='address', keep=True)], + right_select=[transform_schema.SelectInput(old_name='id', keep=True), + transform_schema.SelectInput(old_name='name', keep=True), + transform_schema.SelectInput(old_name='address', keep=True)], + ), auto_keep_all=True) + flow.add_fuzzy_match(settings) + + add_connection(flow, input_schema.NodeConnection.create_from_simple_input(1, 2, input_type="main")) + add_connection(flow, input_schema.NodeConnection.create_from_simple_input(1, 2, input_type="right")) + + code = export_flow_to_polars(flow) + + verify_if_execute(code) + result=get_result_from_generated_code(code) + expected_df = flow.get_node(2).get_resulting_data().data_frame + assert_frame_equal(result, expected_df, check_dtype=False, check_row_order=False) + + +def test_fuzzy_match_single_multiple_columns_file(fuzzy_join_left_data): + flow = create_basic_flow(1) + flow.add_manual_input(fuzzy_join_left_data) + settings = input_schema.NodeFuzzyMatch(flow_id=1, node_id=2, description='', auto_generate_selection=True, + join_input=transform_schema.FuzzyMatchInput( + join_mapping=[FuzzyMapping('name',threshold_score=75.0)], + left_select=[transform_schema.SelectInput(old_name='name', keep=True), + transform_schema.SelectInput(old_name='id', keep=True)], + right_select=[transform_schema.SelectInput(old_name='name', keep=True), + transform_schema.SelectInput(old_name='id', keep=False)], + ), auto_keep_all=True) + flow.add_fuzzy_match(settings) + add_connection(flow, input_schema.NodeConnection.create_from_simple_input(1, 2, input_type="main")) + add_connection(flow, input_schema.NodeConnection.create_from_simple_input(1, 2, input_type="right")) + + code = export_flow_to_polars(flow) + + verify_if_execute(code) + result = get_result_from_generated_code(code) + expected_df = flow.get_node(2).get_resulting_data().data_frame + assert_frame_equal(result, expected_df, check_dtype=False, check_row_order=False) diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index 66aac153..8649ccae 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -14,11 +14,12 @@ from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema + import pytest from pathlib import Path from typing import List, Dict, Literal from copy import deepcopy - +from time import sleep try: from tests.flowfile_core_test_utils import (is_docker_available, ensure_password_is_available) @@ -308,13 +309,48 @@ def test_add_fuzzy_match(): run_info = graph.run_graph() handle_run_info(run_info) output_data = graph.get_node(2).get_resulting_data() - expected_data = FlowDataEngine([{'name': 'eduward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'edward'}, - {'name': 'edward', 'fuzzy_score_0': 1.0, 'name_right': 'edward'}, - {'name': 'eduward', 'fuzzy_score_0': 1.0, 'name_right': 'eduward'}, - {'name': 'edward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'eduward'}, - {'name': 'courtney', 'fuzzy_score_0': 1.0, 'name_right': 'courtney'}] - ) + output_data.to_dict() + expected_data = FlowDataEngine({ + 'name': ['edward', 'eduward', 'courtney', 'edward', 'eduward'], + 'name_right': ['edward', 'edward', 'courtney', 'eduward', 'eduward'], + 'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 0.8571428571428572, 1.0]} + ) + output_data.assert_equal(expected_data) + + +def test_add_fuzzy_match_lcoal(): + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER + + graph = create_graph() + graph.flow_settings.execution_location = "local" + OFFLOAD_TO_WORKER.value = False + input_data = [{'name': 'eduward'}, + {'name': 'edward'}, + {'name': 'courtney'}] + add_manual_input(graph, data=input_data) + add_node_promise_on_type(graph, 'fuzzy_match', 2) + left_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection.input_connection.connection_class = 'input-1' + add_connection(graph, left_connection) + add_connection(graph, right_connection) + data = {'flow_id': 1, 'node_id': 2, 'cache_results': False, 'join_input': + {'join_mapping': [{'left_col': 'name', 'right_col': 'name', 'threshold_score': 75, 'fuzzy_type': 'levenshtein', + 'valid': True}], + 'left_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'right_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'how': 'inner'}, 'auto_keep_all': True, 'auto_keep_right': True, 'auto_keep_left': True} + graph.add_fuzzy_match(input_schema.NodeFuzzyMatch(**data)) + run_info = graph.run_graph() + handle_run_info(run_info) + output_data = graph.get_node(2).get_resulting_data() + expected_data = FlowDataEngine({ + 'name': ['edward', 'eduward', 'courtney', 'edward', 'eduward'], + 'name_right': ['edward', 'edward', 'courtney', 'eduward', 'eduward'], + 'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 0.8571428571428572, 1.0]} + ) output_data.assert_equal(expected_data) + OFFLOAD_TO_WORKER.value = True def test_add_record_count(): @@ -1058,7 +1094,7 @@ def test_schema_callback_cloud_read(flow_logger): cloud_storage_settings=read_settings) graph.add_cloud_storage_reader(node_settings) node = graph.get_node(1) - assert node.schema_callback.future is not None, 'Schema callback future should be set' + assert node.schema_callback._future is not None, 'Schema callback future should be set' assert len(node.schema_callback()) == 4, 'Schema should have 4 columns' original_schema_callback = id(node.schema_callback) graph.add_cloud_storage_reader(node_settings) @@ -1154,6 +1190,7 @@ def test_no_re_calculate_example_data_after_change_no_run(): OFFLOAD_TO_WORKER.value = True + def test_add_fuzzy_match_only_local(): from flowfile_core.configs.settings import OFFLOAD_TO_WORKER OFFLOAD_TO_WORKER.value = False @@ -1179,11 +1216,53 @@ def test_add_fuzzy_match_only_local(): run_info = graph.run_graph() handle_run_info(run_info) output_data = graph.get_node(2).get_resulting_data() - expected_data = FlowDataEngine([{'name': 'eduward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'edward'}, - {'name': 'edward', 'fuzzy_score_0': 1.0, 'name_right': 'edward'}, - {'name': 'eduward', 'fuzzy_score_0': 1.0, 'name_right': 'eduward'}, - {'name': 'edward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'eduward'}, - {'name': 'courtney', 'fuzzy_score_0': 1.0, 'name_right': 'courtney'}] - ) + expected_data = FlowDataEngine( + {'name': ['courtney', 'eduward', 'edward', 'eduward', 'edward'], + 'name_right': ['courtney', 'edward', 'edward', 'eduward', 'eduward'], + 'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 1.0, 0.8571428571428572]} + ) output_data.assert_equal(expected_data) - OFFLOAD_TO_WORKER.value = True \ No newline at end of file + OFFLOAD_TO_WORKER.value = True + + +def test_fuzzy_match_schema_predict(flow_logger): + graph = create_graph() + input_data = [{'name': 'eduward'}, + {'name': 'edward'}, + {'name': 'courtney'}] + add_manual_input(graph, data=input_data) + add_node_promise_on_type(graph, 'fuzzy_match', 2) + left_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection.input_connection.connection_class = 'input-1' + add_connection(graph, left_connection) + add_connection(graph, right_connection) + data = {'flow_id': 1, 'node_id': 2, 'cache_results': False, 'join_input': + {'join_mapping': [{'left_col': 'name', 'right_col': 'name', 'threshold_score': 75, 'fuzzy_type': 'levenshtein', + 'valid': True}], + 'left_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'right_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'how': 'inner'}, 'auto_keep_all': True, 'auto_keep_right': True, 'auto_keep_left': True} + graph.add_fuzzy_match(input_schema.NodeFuzzyMatch(**data)) + node = graph.get_node(2) + org_func = node._function + + def test_func(*args, **kwargs): + raise ValueError('This is a test error') + node._function = test_func + # enforce to calculate the data based on the schema + predicted_data = node.get_predicted_resulting_data() + assert predicted_data.columns == ['name', 'name_right', 'name_vs_name_right_levenshtein'] + input_data = [{'name': 'eduward', 'other_field': 'test'}, + {'name': 'edward'}, + {'name': 'courtney'}] + add_manual_input(graph, data=input_data) + sleep(0.1) + predicted_data = node.get_predicted_resulting_data() # Gives none because the schema predict is programmed to run only once. + flow_logger.info("This is the test") + flow_logger.info(str(len(predicted_data.columns))) + flow_logger.warning(str(predicted_data.collect())) + assert len(predicted_data.columns) == 5 + node._function = org_func # Restore the original function + result = node.get_resulting_data() + assert result.columns == predicted_data.columns diff --git a/flowfile_frame/flowfile_frame/__init__.py b/flowfile_frame/flowfile_frame/__init__.py index 058a64f7..b7275535 100644 --- a/flowfile_frame/flowfile_frame/__init__.py +++ b/flowfile_frame/flowfile_frame/__init__.py @@ -2,11 +2,11 @@ """A Polars-like API for building ETL graphs.""" from flowfile_core.configs.settings import OFFLOAD_TO_WORKER - OFFLOAD_TO_WORKER.value = False # Core classes from flowfile_frame.flow_frame import FlowFrame # noqa: F401 +from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401 from flowfile_frame.utils import create_flow_graph # noqa: F401 diff --git a/flowfile_frame/flowfile_frame/flow_frame.py b/flowfile_frame/flowfile_frame/flow_frame.py index ad2e5c72..7ae00d48 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.py +++ b/flowfile_frame/flowfile_frame/flow_frame.py @@ -5,11 +5,13 @@ import re import polars as pl -from polars._typing import (CsvEncoding) from flowfile_frame.lazy_methods import add_lazyframe_methods -from polars._typing import (FrameInitTypes, SchemaDefinition, SchemaDict, Orientation) +from polars._typing import (CsvEncoding, FrameInitTypes, SchemaDefinition, SchemaDict, Orientation) from collections.abc import Iterator + +from pl_fuzzy_frame_match import FuzzyMapping, fuzzy_match_dfs + from flowfile_core.flowfile.flow_graph import FlowGraph, add_connection from flowfile_core.flowfile.flow_graph_utils import combine_flow_graphs_with_mapping from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine @@ -20,8 +22,7 @@ from flowfile_frame.selectors import Selector from flowfile_frame.group_frame import GroupByFrame from flowfile_frame.utils import (_parse_inputs_as_iterable, create_flow_graph, stringify_values, - ensure_inputs_as_iterable, generate_node_id, - set_node_id, data as node_id_data) + ensure_inputs_as_iterable, generate_node_id, data as node_id_data) from flowfile_frame.join import _normalize_columns_to_list, _create_join_mappings from flowfile_frame.utils import _check_if_convertible_to_code from flowfile_frame.config import logger @@ -2109,6 +2110,34 @@ def explode( return self._create_child_frame(new_node_id) + def fuzzy_match(self, + other: "FlowFrame", + fuzzy_mappings: List[FuzzyMapping], + description: str = None, + ) -> "FlowFrame": + self._ensure_same_graph(other) + + # Step 3: Generate new node ID + new_node_id = generate_node_id() + node_fuzzy_match = input_schema.NodeFuzzyMatch(flow_id=self.flow_graph.flow_id, + node_id=new_node_id, + join_input= + transform_schema.FuzzyMatchInput(join_mapping=fuzzy_mappings, + left_select=self.columns, + right_select=other.columns), + description=description or "Fuzzy match between two FlowFrames", + depending_on_ids=[self.node_id, other.node_id], + ) + self.flow_graph.add_fuzzy_match(node_fuzzy_match) + self._add_connection(self.node_id, new_node_id, "main") + other._add_connection(other.node_id, new_node_id, "right") + return FlowFrame( + data=self.flow_graph.get_node(new_node_id).get_resulting_data().data_frame, + flow_graph=self.flow_graph, + node_id=new_node_id, + parent_node_id=self.node_id, + ) + def text_to_rows( self, column: str | Column, diff --git a/flowfile_frame/flowfile_frame/flow_frame.pyi b/flowfile_frame/flowfile_frame/flow_frame.pyi index 1c6df182..0c2b5d16 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.pyi +++ b/flowfile_frame/flowfile_frame/flow_frame.pyi @@ -208,6 +208,8 @@ class FlowFrame: # Get the first row of the DataFrame. def first(self, description: Optional[str] = None) -> 'FlowFrame': ... + def fuzzy_match(self, other: FlowFrame, fuzzy_mappings: typing.List[flowfile_core.schemas.transform_schema.FuzzyMap], description: str = None) -> 'FlowFrame': ... + # Take every nth row in the LazyFrame and return as a new LazyFrame. def gather_every(self, n: int, offset: int = 0, description: Optional[str] = None) -> 'FlowFrame': ... diff --git a/flowfile_frame/tests/test_flow_frame.py b/flowfile_frame/tests/test_flow_frame.py index 80c62d33..857fe5f3 100644 --- a/flowfile_frame/tests/test_flow_frame.py +++ b/flowfile_frame/tests/test_flow_frame.py @@ -15,6 +15,7 @@ from polars.testing import assert_frame_equal from flowfile_frame.flow_frame_methods import read_csv from flowfile_frame.expr import col +from pl_fuzzy_frame_match.models import FuzzyMapping try: @@ -822,4 +823,26 @@ def test_read_csv_integration(): finally: # Clean up os.unlink(tmp1_path) - os.unlink(tmp2_path) \ No newline at end of file + os.unlink(tmp2_path) + + +def test_fuzzy_match(): + """Test fuzzy matching operations.""" + from flowfile_core.schemas.input_schema import MinimalFieldInfo, RawData + left_data = FlowFrame({"id": [1, 2, 3, 4, 5], "street": ["123 Main St", "456 Elm St", "789 Maple Ave", "101 Oak St", "202 Pine St"]}) + right_data = FlowFrame({"id": [1, 2, 3], "street": ["123 Main Street", "456 Elm Street", "789 Maple Avenue"]}) + + fuzzy_match_input = FuzzyMapping("street", threshold_score=40) + + # Fuzzy match on name + result = left_data.fuzzy_match(right_data, [fuzzy_match_input]).collect() + + expected_df = pl.DataFrame({ + 'id': [1, 4, 3, 2], + 'street': ['123 Main St', '101 Oak St', '789 Maple Ave', '456 Elm St'], + 'id_right': [1, 1, 3, 2], 'street_right': ['123 Main Street', '123 Main Street', '789 Maple Avenue', '456 Elm Street'], + 'street_vs_street_right_levenshtein': [0.7333333333333334, 0.4, 0.8125, 0.7142857142857143]} + ) + assert_frame_equal(result, expected_df, check_row_order=False, check_exact=False) + + diff --git a/flowfile_worker/flowfile_worker/funcs.py b/flowfile_worker/flowfile_worker/funcs.py index 79f176b4..f63bbee9 100644 --- a/flowfile_worker/flowfile_worker/funcs.py +++ b/flowfile_worker/flowfile_worker/funcs.py @@ -2,8 +2,9 @@ import io from typing import List, Dict, Callable from multiprocessing import Array, Value, Queue -from flowfile_worker.polars_fuzzy_match.matcher import fuzzy_match_dfs -from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping + +from pl_fuzzy_frame_match import fuzzy_match_dfs, FuzzyMapping + from flowfile_worker.flow_logger import get_worker_logger from flowfile_worker.external_sources.sql_source.models import DatabaseWriteSettings from flowfile_worker.external_sources.sql_source.main import write_df_to_database @@ -33,7 +34,10 @@ def fuzzy_join_task(left_serializable_object: bytes, right_serializable_object: flowfile_logger.info("Starting fuzzy join operation") left_df = pl.LazyFrame.deserialize(io.BytesIO(left_serializable_object)) right_df = pl.LazyFrame.deserialize(io.BytesIO(right_serializable_object)) - fuzzy_match_result = fuzzy_match_dfs(left_df, right_df, fuzzy_maps, flowfile_logger) + fuzzy_match_result = fuzzy_match_dfs(left_df=left_df, + right_df=right_df, + fuzzy_maps=fuzzy_maps, + logger=flowfile_logger) flowfile_logger.info("Fuzzy join operation completed successfully") fuzzy_match_result.write_ipc(file_path) with progress.get_lock(): diff --git a/flowfile_worker/flowfile_worker/models.py b/flowfile_worker/flowfile_worker/models.py index cb201e58..675b3cb2 100644 --- a/flowfile_worker/flowfile_worker/models.py +++ b/flowfile_worker/flowfile_worker/models.py @@ -1,7 +1,9 @@ from pydantic import BaseModel from typing import Optional, Literal, Any from base64 import decodebytes -from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping + +from pl_fuzzy_frame_match import FuzzyMapping + from flowfile_worker.external_sources.sql_source.models import DatabaseWriteSettings from flowfile_worker.external_sources.s3_source.models import CloudStorageWriteSettings diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/__init__.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/matcher.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/matcher.py deleted file mode 100644 index 04cb96ce..00000000 --- a/flowfile_worker/flowfile_worker/polars_fuzzy_match/matcher.py +++ /dev/null @@ -1,435 +0,0 @@ -import polars as pl -from typing import List, Optional, Tuple -import tempfile -from logging import Logger - -from flowfile_worker.polars_fuzzy_match.process import calculate_and_parse_fuzzy, process_fuzzy_frames -from flowfile_worker.polars_fuzzy_match.pre_process import pre_process_for_fuzzy_matching -from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping -from flowfile_worker.polars_fuzzy_match.utils import cache_polars_frame_to_temp -from flowfile_worker.utils import collect_lazy_frame -import polars_simed as ps - - -HAS_POLARS_SIM = True - - -def ensure_left_is_larger(left_df: pl.DataFrame, - right_df: pl.DataFrame, - left_col_name: str, - right_col_name: str) -> tuple: - """ - Ensures that the left dataframe is always the larger one. - If the right dataframe is larger, swaps them. - - Args: - left_df: The left dataframe - right_df: The right dataframe - left_col_name: Column name for the left dataframe - right_col_name: Column name for the right dataframe - - Returns: - tuple: (left_df, right_df, left_col_name, right_col_name) - """ - left_frame_len = left_df.select(pl.len())[0, 0] - right_frame_len = right_df.select(pl.len())[0, 0] - - # Swap dataframes if right is larger than left - if right_frame_len > left_frame_len: - return right_df, left_df, right_col_name, left_col_name - - return left_df, right_df, left_col_name, right_col_name - - -def split_dataframe(df: pl.DataFrame, max_chunk_size: int = 500_000) -> List[pl.DataFrame]: - """ - Split a Polars DataFrame into multiple DataFrames with a maximum size. - - Args: - df: The Polars DataFrame to split - max_chunk_size: Maximum number of rows per chunk (default: 500,000) - - Returns: - List of Polars DataFrames, each containing at most max_chunk_size rows - """ - total_rows = df.select(pl.len())[0, 0] - - # If DataFrame is smaller than max_chunk_size, return it as is - if total_rows <= max_chunk_size: - return [df] - - # Calculate number of chunks needed - num_chunks = (total_rows + max_chunk_size - 1) // max_chunk_size # Ceiling division - - chunks = [] - for i in range(num_chunks): - start_idx = i * max_chunk_size - end_idx = min((i + 1) * max_chunk_size, total_rows) - - # Extract chunk using slice - chunk = df.slice(start_idx, end_idx - start_idx) - chunks.append(chunk) - - return chunks - - -def cross_join_large_files(left_fuzzy_frame: pl.LazyFrame, - right_fuzzy_frame: pl.LazyFrame, - left_col_name: str, - right_col_name: str, - flowfile_logger: Logger, - ) -> pl.LazyFrame: - if not HAS_POLARS_SIM: - raise Exception('The polars-sim library is required to perform this operation.') - - left_df = collect_lazy_frame(left_fuzzy_frame) - right_df = collect_lazy_frame(right_fuzzy_frame) - - left_df, right_df, left_col_name, right_col_name = ensure_left_is_larger( - left_df, right_df, left_col_name, right_col_name - ) - left_chunks = split_dataframe(left_df, max_chunk_size=500_000) # Reduced chunk size - flowfile_logger.info(f"Splitting left dataframe into {len(left_chunks)} chunks.") - df_matches = [] - - # Process each chunk combination with error handling - for i, left_chunk in enumerate(left_chunks): - chunk_matches = ps.join_sim( - left=left_chunk, - right=right_df, - left_on=left_col_name, - right_on=right_col_name, - top_n=100, - add_similarity=False, - ) - flowfile_logger.info(f"Processed chunk {int(i)} with {len(chunk_matches)} matches.") - df_matches.append(chunk_matches) - - - # Combine all matches - if df_matches: - return pl.concat(df_matches).lazy() - else: - columns = list(set(left_df.columns).union(set(right_df.columns))) - return pl.DataFrame(schema={col: pl.Null for col in columns}).lazy() - - -def cross_join_small_files(left_df: pl.LazyFrame, right_df: pl.LazyFrame) -> pl.LazyFrame: - return left_df.join(right_df, how='cross') - - -def cross_join_filter_existing_fuzzy_results(left_df: pl.LazyFrame, right_df: pl.LazyFrame, - existing_matches: pl.LazyFrame, - left_col_name: str, right_col_name: str): - """ - Process and filter fuzzy matching results by joining dataframes using existing match indices. - - This function takes previously identified fuzzy matches (existing_matches) and performs - a series of operations to create a refined dataset of matches between the left and right - dataframes, preserving index relationships. - - Parameters: - ----------- - left_df : pl.LazyFrame - The left dataframe containing records to be matched. - right_df : pl.LazyFrame - The right dataframe containing records to be matched against. - existing_matches : pl.LazyFrame - A dataframe containing the indices of already identified matches between - left_df and right_df, with columns '__left_index' and '__right_index'. - left_col_name : str - The column name from left_df to include in the result. - right_col_name : str - The column name from right_df to include in the result. - - Returns: - -------- - pl.LazyFrame - A dataframe containing the unique matches between left_df and right_df, - with index information for both dataframes preserved. The resulting dataframe - includes the specified columns from both dataframes along with their respective - index aggregations. - - Notes: - ------ - The function performs these operations: - 1. Join existing matches with both dataframes using their respective indices - 2. Select only the relevant columns and remove duplicates - 3. Create aggregations that preserve the relationship between values and their indices - 4. Join these aggregations back to create the final result set - """ - joined_df = (existing_matches - .select(['__left_index', '__right_index']) - .join(left_df, on='__left_index') - .join(right_df, on='__right_index') - .select(left_col_name, right_col_name, '__left_index', '__right_index') - ) - return joined_df.group_by([left_col_name, right_col_name]).agg('__left_index', '__right_index') - - -def cross_join_no_existing_fuzzy_results(left_df: pl.LazyFrame, right_df: pl.LazyFrame, left_col_name: str, - right_col_name: str, temp_dir_ref: str, - flowfile_logger: Logger) -> pl.LazyFrame: - """ - Generate fuzzy matching results by performing a cross join between dataframes. - - This function processes the input dataframes, determines the appropriate cross join method - based on the size of the resulting cartesian product, and returns the cross-joined results - for fuzzy matching when no existing matches are provided. - - Parameters: - ----------- - left_df : pl.LazyFrame - The left dataframe containing records to be matched. - right_df : pl.LazyFrame - The right dataframe containing records to be matched against. - left_col_name : str - The column name from left_df to use for fuzzy matching. - right_col_name : str - The column name from right_df to use for fuzzy matching. - temp_dir_ref : str - Reference to a temporary directory where intermediate results can be stored - during processing of large dataframes. - - Returns: - -------- - pl.LazyFrame - A dataframe containing the cross join results of left_df and right_df, - prepared for fuzzy matching operations. - - Notes: - ------ - The function performs these operations: - 1. Processes input frames using the process_fuzzy_frames helper function - 2. Calculates the size of the cartesian product to determine processing approach - 3. Uses either cross_join_large_files or cross_join_small_files based on the size: - - For cartesian products > 100M but < 1T (or 10M without polars-sim), uses large file method - - For smaller products, uses the small file method - 4. Raises an exception if the cartesian product exceeds the maximum allowed size - - Raises: - ------- - Exception - If the cartesian product of the two dataframes exceeds the maximum allowed size - (1 trillion with polars-sim, 100 million without). - """ - (left_fuzzy_frame, - right_fuzzy_frame, - left_col_name, - right_col_name, - len_left_df, - len_right_df) = process_fuzzy_frames(left_df=left_df, right_df=right_df, left_col_name=left_col_name, - right_col_name=right_col_name, temp_dir_ref=temp_dir_ref) - cartesian_size = len_left_df * len_right_df - max_size = 100_000_000_000_000 if HAS_POLARS_SIM else 10_000_000 - if cartesian_size > max_size: - flowfile_logger.error(f'The cartesian product of the two dataframes is too large to process: {cartesian_size}') - raise Exception('The cartesian product of the two dataframes is too large to process.') - if cartesian_size > 100_000_000: - flowfile_logger.info('Performing approximate fuzzy match for large dataframes to reduce memory usage.') - cross_join_frame = cross_join_large_files(left_fuzzy_frame, right_fuzzy_frame, left_col_name=left_col_name, - right_col_name=right_col_name, flowfile_logger=flowfile_logger) - else: - cross_join_frame = cross_join_small_files(left_fuzzy_frame, right_fuzzy_frame) - return cross_join_frame - - -def unique_df_large(_df: pl.DataFrame | pl.LazyFrame, cols: Optional[List[str]] = None) -> pl.DataFrame: - """ - Efficiently compute unique rows in large dataframes by partitioning. - - This function processes large dataframes by first partitioning them by a selected column, - then finding unique combinations within each partition before recombining the results. - This approach is more memory-efficient for large datasets than calling .unique() directly. - - Parameters: - ----------- - _df : pl.DataFrame | pl.LazyFrame - The input dataframe to process. Can be either a Polars DataFrame or LazyFrame. - cols : Optional[List[str]] - The list of columns to consider when finding unique rows. If None, all columns - are used. The first column in this list is used as the partition column. - - Returns: - -------- - pl.DataFrame - A dataframe containing only the unique rows from the input dataframe, - based on the specified columns. - - Notes: - ------ - The function performs these operations: - 1. Converts LazyFrame to DataFrame if necessary - 2. Partitions the dataframe by the first column in cols (or the first column of the dataframe if cols is None) - 3. Applies the unique operation to each partition based on the remaining columns - 4. Concatenates the results back into a single dataframe - 5. Frees memory by deleting intermediate objects - - This implementation uses tqdm to provide a progress bar during processing, - which is particularly helpful for large datasets where the operation may take time. - """ - if isinstance(_df, pl.LazyFrame): - _df = collect_lazy_frame(_df) - from tqdm import tqdm - partition_col = cols[0] if cols is not None else _df.columns[0] - other_cols = cols[1:] if cols is not None else _df.columns[1:] - partitioned_df = _df.partition_by(partition_col) - df = pl.concat([partition.unique(other_cols) for partition in tqdm(partitioned_df)]) - del partitioned_df, _df - return df - - -def combine_matches(matching_dfs: List[pl.LazyFrame]): - all_matching_indexes = matching_dfs[-1].select('__left_index', '__right_index') - for matching_df in matching_dfs: - all_matching_indexes = all_matching_indexes.join(matching_df, on=['__left_index', '__right_index']) - return all_matching_indexes - - -def add_index_column(df: pl.LazyFrame, column_name: str, tempdir: str): - return cache_polars_frame_to_temp(df.with_row_index(name=column_name), tempdir) - - -def process_fuzzy_mapping( - fuzzy_map: FuzzyMapping, - left_df: pl.LazyFrame, - right_df: pl.LazyFrame, - existing_matches: Optional[pl.LazyFrame], - local_temp_dir_ref: str, - i: int, - flowfile_logger: Logger, - existing_number_of_matches: Optional[int] = None -) -> Tuple[pl.LazyFrame, int]: - """ - Process a single fuzzy mapping to generate matching dataframes. - - Args: - fuzzy_map: The fuzzy mapping configuration containing match columns and thresholds - left_df: Left dataframe with index column - right_df: Right dataframe with index column - existing_matches: Previously computed matches (or None) - local_temp_dir_ref: Temporary directory reference for caching interim results - i: Index of the current fuzzy mapping - flowfile_logger: Logger instance for progress tracking - existing_number_of_matches: Number of existing matches (if available) - - Returns: - Tuple[pl.LazyFrame, int]: The final matching dataframe and the number of matches - """ - # Determine join strategy based on existing matches - if existing_matches is not None: - existing_matches = existing_matches.select('__left_index', '__right_index') - flowfile_logger.info(f'Filtering existing fuzzy matches for {fuzzy_map.left_col} and {fuzzy_map.right_col}') - cross_join_frame = cross_join_filter_existing_fuzzy_results( - left_df=left_df, - right_df=right_df, - existing_matches=existing_matches, - left_col_name=fuzzy_map.left_col, - right_col_name=fuzzy_map.right_col - ) - else: - flowfile_logger.info(f'Performing fuzzy match for {fuzzy_map.left_col} and {fuzzy_map.right_col}') - cross_join_frame = cross_join_no_existing_fuzzy_results( - left_df=left_df, - right_df=right_df, - left_col_name=fuzzy_map.left_col, - right_col_name=fuzzy_map.right_col, - temp_dir_ref=local_temp_dir_ref, - flowfile_logger=flowfile_logger - ) - - # Calculate fuzzy match scores - flowfile_logger.info(f'Calculating fuzzy match for {fuzzy_map.left_col} and {fuzzy_map.right_col}') - matching_df = calculate_and_parse_fuzzy( - mapping_table=cross_join_frame, - left_col_name=fuzzy_map.left_col, - right_col_name=fuzzy_map.right_col, - fuzzy_method=fuzzy_map.fuzzy_type, - th_score=fuzzy_map.reversed_threshold_score - ) - if existing_matches is not None: - matching_df = matching_df.join(existing_matches, on=['__left_index', '__right_index']) - matching_df = cache_polars_frame_to_temp(matching_df, local_temp_dir_ref) - if existing_number_of_matches is None or existing_number_of_matches > 100_000_000: - existing_number_of_matches = matching_df.select(pl.len()).collect()[0, 0] - if existing_number_of_matches > 100_000_000: - return unique_df_large(matching_df.rename({'s': f'fuzzy_score_{i}'})).lazy(), existing_number_of_matches - else: - return matching_df.rename({'s': f'fuzzy_score_{i}'}).unique(), existing_number_of_matches - - -def perform_all_fuzzy_matches(left_df: pl.LazyFrame, - right_df: pl.LazyFrame, - fuzzy_maps: List[FuzzyMapping], - flowfile_logger: Logger, - local_temp_dir_ref: str, - ) -> List[pl.LazyFrame]: - matching_dfs = [] - existing_matches = None - existing_number_of_matches = None - for i, fuzzy_map in enumerate(fuzzy_maps): - existing_matches, existing_number_of_matches = process_fuzzy_mapping( - fuzzy_map=fuzzy_map, - left_df=left_df, - right_df=right_df, - existing_matches=existing_matches, - local_temp_dir_ref=local_temp_dir_ref, - i=i, - flowfile_logger=flowfile_logger, - existing_number_of_matches=existing_number_of_matches - ) - matching_dfs.append(existing_matches) - return matching_dfs - - -def fuzzy_match_dfs( - left_df: pl.LazyFrame, - right_df: pl.LazyFrame, - fuzzy_maps: List[FuzzyMapping], - flowfile_logger: Logger -) -> pl.DataFrame: - """ - Perform fuzzy matching between two dataframes using multiple fuzzy mapping configurations. - - Args: - left_df: Left dataframe to be matched - right_df: Right dataframe to be matched - fuzzy_maps: List of fuzzy mapping configurations - flowfile_logger: Logger instance for tracking progress - - Returns: - pl.DataFrame: The final matched dataframe with all fuzzy scores - """ - left_df, right_df, fuzzy_maps = pre_process_for_fuzzy_matching(left_df, right_df, fuzzy_maps, flowfile_logger) - - # Create a temporary directory for caching intermediate results - local_temp_dir = tempfile.TemporaryDirectory() - local_temp_dir_ref = local_temp_dir.name - - # Add index columns to both dataframes - left_df = add_index_column(left_df, '__left_index', local_temp_dir_ref) - right_df = add_index_column(right_df, '__right_index', local_temp_dir_ref) - - matching_dfs = perform_all_fuzzy_matches(left_df, right_df, fuzzy_maps, flowfile_logger, local_temp_dir_ref) - - # Combine all matches - if len(matching_dfs) > 1: - flowfile_logger.info('Combining fuzzy matches') - all_matches_df = combine_matches(matching_dfs) - else: - flowfile_logger.info('Caching fuzzy matches') - all_matches_df = cache_polars_frame_to_temp(matching_dfs[0], local_temp_dir_ref) - - # Join matches with original dataframes - flowfile_logger.info('Joining fuzzy matches with original dataframes') - output_df = collect_lazy_frame( - (left_df.join(all_matches_df, on='__left_index') - .join(right_df, on='__right_index') - .drop('__right_index', '__left_index')) - ) - - # Clean up temporary files - flowfile_logger.info('Cleaning up temporary files') - local_temp_dir.cleanup() - - return output_df diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/models.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/models.py deleted file mode 100644 index 80f02395..00000000 --- a/flowfile_worker/flowfile_worker/polars_fuzzy_match/models.py +++ /dev/null @@ -1,36 +0,0 @@ -from dataclasses import dataclass -from typing import Optional, Literal - -FuzzyTypeLiteral = Literal['levenshtein','jaro', 'jaro_winkler', 'hamming', 'damerau_levenshtein', 'indel'] - - -@dataclass -class JoinMap: - left_col: str - right_col: str - - -@dataclass -class FuzzyMapping(JoinMap): - threshold_score: float = 80.0 - fuzzy_type: FuzzyTypeLiteral = 'levenshtein' - perc_unique: float = 0.0 - output_column_name: Optional[str] = None - valid: bool = True - - def __init__(self, left_col: str, right_col: str = None, threshold_score: float = 80.0, - fuzzy_type: FuzzyTypeLiteral = 'levenshtein', perc_unique: float = 0, output_column_name: str = None, - valid: bool = True): - if right_col is None: - right_col = left_col - self.valid = valid - self.left_col = left_col - self.right_col = right_col - self.threshold_score = threshold_score - self.fuzzy_type = fuzzy_type - self.perc_unique = perc_unique - self.output_col_name = output_column_name if output_column_name is not None else f'fuzzy_score_{left_col}_{right_col}' - - @property - def reversed_threshold_score(self) -> float: - return ((int(self.threshold_score) - 100) * -1) / 100 diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/pre_process.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/pre_process.py deleted file mode 100644 index cdb70372..00000000 --- a/flowfile_worker/flowfile_worker/polars_fuzzy_match/pre_process.py +++ /dev/null @@ -1,213 +0,0 @@ -from logging import Logger -from typing import List, Dict, Tuple - -import polars as pl - -from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping -from flowfile_worker.utils import collect_lazy_frame - - -def get_approx_uniqueness(lf: pl.LazyFrame) -> Dict[str, int]: - """ - Calculate the approximate number of unique values for each column in a LazyFrame. - - Args: - lf (pl.LazyFrame): Input LazyFrame to analyze. - - Returns: - Dict[str, int]: Dictionary mapping column names to their approximate unique value counts. - - Raises: - Exception: If the uniqueness calculation fails (empty result). - """ - uniqueness = lf.select(pl.all().approx_n_unique()).collect().to_dicts() - if len(uniqueness) == 0: - raise Exception('Approximate uniqueness calculation failed') - return uniqueness[0] - - -def calculate_uniqueness(a: float, b: float) -> float: - """ - Calculate a combined uniqueness score from two individual uniqueness ratios. - - The formula prioritizes columns with high combined uniqueness while accounting for - differences between the two input values. - - Args: - a (float): First uniqueness ratio, typically from the left dataframe. - b (float): Second uniqueness ratio, typically from the right dataframe. - - Returns: - float: Combined uniqueness score. - """ - return ((pow(a + 0.5, 2) + pow(b + 0.5, 2)) / 2 - pow(0.5, 2)) + 0.5 * abs(a - b) - - -def calculate_df_len(df: pl.LazyFrame) -> int: - """ - Calculate the number of rows in a LazyFrame. - - Args: - df (pl.LazyFrame): Input LazyFrame. - - Returns: - int: Number of rows in the LazyFrame. - """ - return collect_lazy_frame(df.select(pl.len()))[0, 0] - - -def fill_perc_unique_in_fuzzy_maps(left_df: pl.LazyFrame, right_df: pl.LazyFrame, fuzzy_maps: List[FuzzyMapping], - flowfile_logger: Logger, left_len: int, right_len: int) -> List[FuzzyMapping]: - """ - Calculate and set uniqueness percentages for all fuzzy mapping columns. - - Computes the approximate unique value counts in both dataframes for the columns - specified in fuzzy_maps, then calculates a combined uniqueness score for each mapping. - - Args: - left_df (pl.LazyFrame): Left dataframe. - right_df (pl.LazyFrame): Right dataframe. - fuzzy_maps (List[FuzzyMapping]): List of fuzzy mappings between left and right columns. - flowfile_logger (Logger): Logger for information output. - left_len (int): Number of rows in the left dataframe. - right_len (int): Number of rows in the right dataframe. - - Returns: - List[FuzzyMapping]: Updated fuzzy mappings with calculated uniqueness percentages. - """ - left_unique_values = get_approx_uniqueness(left_df.select(fuzzy_map.left_col for fuzzy_map in fuzzy_maps)) - right_unique_values = get_approx_uniqueness(right_df.select(fuzzy_map.right_col for fuzzy_map in fuzzy_maps)) - flowfile_logger.info(f'Left unique values: {left_unique_values}') - flowfile_logger.info(f'Right unique values: {right_unique_values}') - for fuzzy_map in fuzzy_maps: - fuzzy_map.perc_unique = calculate_uniqueness(left_unique_values[fuzzy_map.left_col] / left_len, - right_unique_values[fuzzy_map.right_col] / right_len) - return fuzzy_maps - - -def determine_order_of_fuzzy_maps(fuzzy_maps: List[FuzzyMapping]) -> List[FuzzyMapping]: - """ - Sort fuzzy mappings by their uniqueness percentages in descending order. - - This ensures that columns with higher uniqueness are prioritized in the - fuzzy matching process. - - Args: - fuzzy_maps (List[FuzzyMapping]): List of fuzzy mappings between columns. - - Returns: - List[FuzzyMapping]: Sorted list of fuzzy mappings by uniqueness (highest first). - """ - return sorted(fuzzy_maps, key=lambda x: x.perc_unique, reverse=True) - - -def calculate_uniqueness_rate(fuzzy_maps: List[FuzzyMapping]) -> float: - """ - Calculate the total uniqueness rate across all fuzzy mappings. - - Args: - fuzzy_maps (List[FuzzyMapping]): List of fuzzy mappings with calculated uniqueness. - - Returns: - float: Sum of uniqueness percentages across all mappings. - """ - return sum(jm.perc_unique for jm in fuzzy_maps) - - -def determine_need_for_aggregation(uniqueness_rate: float, cartesian_join_number: int) -> bool: - """ - Determine if aggregation is needed based on uniqueness and potential join size. - - Aggregation helps prevent explosive cartesian joins when matching columns - have low uniqueness, which could lead to performance issues. - - Args: - uniqueness_rate (float): Total uniqueness rate across fuzzy mappings. - cartesian_join_number (int): Potential size of the cartesian join (left_len * right_len). - - Returns: - bool: True if aggregation is needed, False otherwise. - """ - return uniqueness_rate < 1.2 and cartesian_join_number > 1_000_000 - - -def aggregate_output(left_df: pl.LazyFrame, right_df: pl.LazyFrame, - fuzzy_maps: List[FuzzyMapping]) -> Tuple[pl.LazyFrame, pl.LazyFrame]: - """ - Deduplicate the dataframes based on the fuzzy mapping columns. - - This reduces the size of the join by removing duplicate rows when the - uniqueness rate is low and the potential join size is large. - - Args: - left_df (pl.LazyFrame): Left dataframe. - right_df (pl.LazyFrame): Right dataframe. - fuzzy_maps (List[FuzzyMapping]): List of fuzzy mappings between columns. - - Returns: - Tuple[pl.LazyFrame, pl.LazyFrame]: Deduplicated left and right dataframes. - """ - left_df = left_df.unique([fuzzy_map.left_col for fuzzy_map in fuzzy_maps]) - right_df = right_df.unique([fuzzy_map.right_col for fuzzy_map in fuzzy_maps]) - return left_df, right_df - - -def report_on_order_of_fuzzy_maps(fuzzy_maps: List[FuzzyMapping], flowfile_logger: Logger) -> None: - """ - Log the order of fuzzy mappings based on uniqueness. - Parameters - ---------- - fuzzy_maps: List[FuzzyMapping] - flowfile_logger: Logger - - ------- - """ - flowfile_logger.info('Fuzzy mappings sorted by uniqueness') - for i, fuzzy_map in enumerate(fuzzy_maps): - flowfile_logger.info(f'{i}. Fuzzy mapping: {fuzzy_map.left_col} -> {fuzzy_map.right_col} ' - f'Uniqueness: {fuzzy_map.perc_unique}') - - -def pre_process_for_fuzzy_matching(left_df: pl.LazyFrame, right_df: pl.LazyFrame, - fuzzy_maps: List[FuzzyMapping], - flowfile_logger: Logger) -> Tuple[pl.LazyFrame, pl.LazyFrame, List[FuzzyMapping]]: - """ - Preprocess dataframes and fuzzy mappings for optimal fuzzy matching. - - This function: - 1. Calculates dataframe sizes - 2. Calculates uniqueness percentages for each fuzzy mapping - 3. Sorts the fuzzy mappings by uniqueness - 4. Determines if aggregation is needed to prevent large cartesian joins - 5. Performs aggregation if necessary - - Args: - left_df (pl.LazyFrame): Left dataframe. - right_df (pl.LazyFrame): Right dataframe. - fuzzy_maps (List[FuzzyMapping]): List of fuzzy mappings between columns. - flowfile_logger (Logger): Logger for information output. - - Returns: - Tuple[pl.LazyFrame, pl.LazyFrame, List[FuzzyMapping]]: - - Potentially modified left dataframe - - Potentially modified right dataframe - - Sorted and updated fuzzy mappings - """ - flowfile_logger.info('Optimizing data and settings for fuzzy matching') - left_df_len = calculate_df_len(left_df) - right_df_len = calculate_df_len(right_df) - if left_df_len == 0 or right_df_len == 0: - return left_df, right_df, fuzzy_maps - fuzzy_maps = fill_perc_unique_in_fuzzy_maps(left_df, right_df, fuzzy_maps, flowfile_logger, left_df_len, - right_df_len) - fuzzy_maps = determine_order_of_fuzzy_maps(fuzzy_maps) - report_on_order_of_fuzzy_maps(fuzzy_maps, flowfile_logger) - - uniqueness_rate = calculate_uniqueness_rate(fuzzy_maps) - flowfile_logger.info(f'Uniqueness rate: {uniqueness_rate}') - if determine_need_for_aggregation(uniqueness_rate, left_df_len * right_df_len): - flowfile_logger.warning('The join fields are not unique enough, resulting in many duplicates, ' - 'therefore removing duplicates on the join field') - left_df, right_df = aggregate_output(left_df, right_df, fuzzy_maps) - flowfile_logger.info('Data and settings optimized for fuzzy matching') - return left_df, right_df, fuzzy_maps diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/process.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/process.py deleted file mode 100644 index 9178c381..00000000 --- a/flowfile_worker/flowfile_worker/polars_fuzzy_match/process.py +++ /dev/null @@ -1,86 +0,0 @@ -import polars as pl -import polars_distance as pld -from flowfile_worker.polars_fuzzy_match.utils import cache_polars_frame_to_temp -from flowfile_worker.utils import collect_lazy_frame -from flowfile_worker.polars_fuzzy_match.models import FuzzyTypeLiteral - - -def calculate_fuzzy_score(mapping_table: pl.LazyFrame, left_col_name: str, right_col_name: str, - fuzzy_method: FuzzyTypeLiteral, th_score: float) -> pl.LazyFrame: - """ - Calculate fuzzy matching scores between columns in a LazyFrame. - - Args: - mapping_table: The DataFrame containing columns to compare - left_col_name: Name of the left column for comparison - right_col_name: Name of the right column for comparison - fuzzy_method: Type of fuzzy matching algorithm to use - th_score: The threshold score for fuzzy matching - - Returns: - A LazyFrame with fuzzy matching scores - """ - mapping_table = mapping_table.with_columns(pl.col(left_col_name).str.to_lowercase().alias('left'), - pl.col(right_col_name).str.to_lowercase().alias('right')) - dist_col = pld.DistancePairWiseString(pl.col('left')) - if fuzzy_method in ("jaro_winkler"): - fm_method = getattr(dist_col, fuzzy_method)(pl.col('right')).alias('s') - else: - fm_method = getattr(dist_col, fuzzy_method)(pl.col('right'), normalized=True).alias('s') - return (mapping_table.with_columns(fm_method).drop(['left', 'right']).filter(pl.col('s') <= th_score). - with_columns((1-pl.col('s')).alias('s'))) - - -def process_fuzzy_frames(left_df: pl.LazyFrame, right_df: pl.LazyFrame, left_col_name: str, right_col_name: str, - temp_dir_ref: str): - """ - Process left and right data frames to create fuzzy frames, - cache them temporarily, and adjust based on their lengths. - - Args: - - left_df (pl.DataFrame): The left data frame. - - right_df (pl.DataFrame): The right data frame. - - fm (object): An object containing configuration such as the left column name. - - temp_dir_ref (str): A reference to the temporary directory for caching frames. - - Returns: - - Tuple[pl.DataFrame, pl.DataFrame, str, str]: Processed left and right fuzzy frames and their respective column names. - """ - - # Process left and right data frames - left_fuzzy_frame = cache_polars_frame_to_temp(left_df.group_by(left_col_name).agg('__left_index'). - filter(pl.col(left_col_name).is_not_null()), temp_dir_ref) - right_fuzzy_frame = cache_polars_frame_to_temp(right_df.group_by(right_col_name).agg('__right_index'). - filter(pl.col(right_col_name).is_not_null()), temp_dir_ref) - # Calculate lengths of fuzzy frames - len_left_df = collect_lazy_frame(left_fuzzy_frame.select(pl.len()))[0, 0] - len_right_df = collect_lazy_frame(right_fuzzy_frame.select(pl.len()))[0, 0] - - # Decide which frame to use as left or right based on their lengths - if len_left_df < len_right_df: - # Swap the frames and column names if right frame is larger - left_fuzzy_frame, right_fuzzy_frame = right_fuzzy_frame, left_fuzzy_frame - left_col_name, right_col_name = right_col_name, left_col_name - - # Return the processed frames and column names - return left_fuzzy_frame, right_fuzzy_frame, left_col_name, right_col_name, len_left_df, len_right_df - - -def calculate_and_parse_fuzzy(mapping_table: pl.LazyFrame, left_col_name: str, right_col_name: str, - fuzzy_method: FuzzyTypeLiteral, th_score: float) -> pl.LazyFrame: - """ - Calculate fuzzy scores and parse/explode the results for further processing. - - Args: - mapping_table: The DataFrame containing columns to compare - left_col_name: Name of the left column for comparison - right_col_name: Name of the right column for comparison - fuzzy_method: Type of fuzzy matching algorithm to use - th_score: Minimum similarity score threshold (0-1) - - Returns: - A LazyFrame with exploded indices and fuzzy scores - """ - return calculate_fuzzy_score(mapping_table, left_col_name, right_col_name, fuzzy_method, th_score).select( - pl.col('s'), pl.col('__left_index'), pl.col('__right_index')).explode(pl.col('__left_index')).explode( - pl.col('__right_index')) diff --git a/flowfile_worker/flowfile_worker/polars_fuzzy_match/utils.py b/flowfile_worker/flowfile_worker/polars_fuzzy_match/utils.py deleted file mode 100644 index 183120ab..00000000 --- a/flowfile_worker/flowfile_worker/polars_fuzzy_match/utils.py +++ /dev/null @@ -1,50 +0,0 @@ -import polars as pl -from flowfile_worker.configs import logger -from flowfile_worker.utils import collect_lazy_frame -import os -import uuid - - -def write_polars_frame(_df: pl.LazyFrame | pl.DataFrame, path: str, - estimated_size: int = 0): - is_lazy = isinstance(_df, pl.LazyFrame) - logger.info('Caching data frame') - if is_lazy: - if estimated_size > 0: - fit_memory = estimated_size / 1024 / 1000 / 1000 < 8 - if fit_memory: - _df = _df.collect() - is_lazy = False - - if is_lazy: - logger.info("Writing in memory efficient mode") - write_method = getattr(_df, 'sink_ipc') - try: - write_method(path) - return True - except Exception as e: - pass - try: - write_method(path) - return True - except Exception as e: - pass - if is_lazy: - _df = collect_lazy_frame(_df) - try: - write_method = getattr(_df, 'write_ipc') - write_method(path) - return True - except Exception as e: - print('error', e) - return False - - -def cache_polars_frame_to_temp(_df: pl.LazyFrame | pl.DataFrame, tempdir: str = None) -> pl.LazyFrame: - path = f'{tempdir}{os.sep}{uuid.uuid4()}' - result = write_polars_frame(_df, path) - if result: - df = pl.read_ipc(path) - return df.lazy() - else: - raise Exception('Could not cache the data') diff --git a/flowfile_worker/tests/polars_fuzzy_match/__init__.py b/flowfile_worker/tests/polars_fuzzy_match/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/flowfile_worker/tests/polars_fuzzy_match/match_utils.py b/flowfile_worker/tests/polars_fuzzy_match/match_utils.py deleted file mode 100644 index af346bf4..00000000 --- a/flowfile_worker/tests/polars_fuzzy_match/match_utils.py +++ /dev/null @@ -1,496 +0,0 @@ -import polars as pl -import numpy as np -import random -import string -from typing import Tuple, List, Dict, Any -from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping - - -def introduce_typos(text: str, error_rate: float = 0.2) -> str: - """ - Introduces random typos in text to simulate fuzzy matching scenarios. - - Args: - text: Original text - error_rate: Probability of each character being altered - - Returns: - Text with typos - """ - if not text or error_rate <= 0: - return text - - result = "" - for char in text: - if random.random() < error_rate: - # Choose type of error: insert, delete, replace - error_type = random.choice(["insert", "delete", "replace"]) - - if error_type == "insert": - result += random.choice(string.ascii_letters) + char - elif error_type == "delete" and len(text) > 3: - # Skip this character (but ensure we don't delete too much) - pass - elif error_type == "replace": - result += random.choice(string.ascii_letters) - else: - result += char - else: - result += char - - return result - - -# Data generation functions -def generate_large_scale_data( - size: int = 10000, - match_rate: float = 0.7, - error_rate: float = 0.2 -) -> Dict[str, Dict[str, List[Any]]]: - """ - Generates raw data for large-scale test scenarios. - - Args: - size: Number of rows to generate - match_rate: Proportion of rows that should have a match - error_rate: Rate of typos in matching fields - - Returns: - Dictionary with 'left_data' and 'right_data' dictionaries - """ - # Generate company names - companies = [f"Company {i}" for i in range(size)] - - # For right dataframe, some will match with typos, some won't match at all - right_companies = [] - for i in range(size): - if random.random() < match_rate: - # This one should match with possible typos - right_companies.append(introduce_typos(companies[i], error_rate)) - else: - # This one shouldn't match - right_companies.append(f"Different Company {i + size}") - - # Generate other fields - addresses = [f"{random.randint(1, 9999)} Main St, City {i}" for i in range(size)] - right_addresses = [ - introduce_typos(addr, error_rate) if random.random() < match_rate else f"Different Address {i}" - for i, addr in enumerate(addresses) - ] - - # Generate countries (not very unique - limited set) - countries = ["USA", "Canada", "UK", "Germany", "France", "Italy", "Japan", "China", "India", "Brazil"] - left_countries = [countries[i % len(countries)] for i in range(size)] - right_countries = [] - for i in range(size): - if random.random() < match_rate: - # This one should match with possible typos - country = left_countries[i] - if random.random() < error_rate: - # Add some typos occasionally - country = introduce_typos(country, error_rate) - right_countries.append(country) - else: - # Pick a different country - different_country_index = (i % len(countries) + random.randint(1, len(countries) - 1)) % len(countries) - right_countries.append(countries[different_country_index]) - - # Create left dataframe data - left_data = { - "id": list(range(1, size + 1)), - "company_name": companies, - "address": addresses, - "country": left_countries - } - - # Create right dataframe data - right_data = { - "id": list(range(1001, size + 1001)), - "organization": right_companies, - "location": right_addresses, - "country_code": right_countries - } - - return { - "left_data": left_data, - "right_data": right_data - } - -def generate_edge_case_data() -> Dict[str, Dict[str, Dict[str, List[Any]]]]: - """ - Generates raw data for different edge case scenarios. - - Returns: - Dictionary mapping scenario names to data dictionaries - """ - edge_case_data = {} - - # Case 1: Empty dataframes - edge_case_data["empty"] = { - "left_data": {"company_name": [], "address": []}, - "right_data": {"organization": [], "location": []} - } - - # Case 2: One-to-many matches - edge_case_data["one_to_many"] = { - "left_data": { - "id": [1], - "company_name": ["ACME Corporation"], - "address": ["100 Main St"] - }, - "right_data": { - "id": [101, 102, 103, 104, 105], - "organization": ["ACME Corp", "ACME Corp.", "ACME Inc", "ACME Corporation", "Completely Different"], - "location": ["100 Main Street", "100 Main St", "100 Main", "Different Address", "Different Address 2"] - } - } - - # Case 3: Many-to-one matches - edge_case_data["many_to_one"] = { - "left_data": { - "id": [1, 2, 3, 4, 5], - "company_name": ["ACME Corp", "ACME Corp.", "ACME Inc", "ACME Corporation", "Completely Different"], - "address": ["100 Main Street", "100 Main St", "100 Main", "Different Address", "Different Address 2"] - }, - "right_data": { - "id": [101], - "organization": ["ACME Corporation"], - "location": ["100 Main St"] - } - } - - # Case 4: Multiple fuzzy criteria with varying thresholds - edge_case_data["multi_criteria"] = { - "left_data": { - "id": [1, 2, 3, 4, 5], - "name": ["John Smith", "Jane Doe", "Bob Johnson", "Alice Brown", "David Miller"], - "email": ["jsmith@example.com", "jane.doe@example.com", "bob.j@example.com", "alice@example.com", - "david@example.com"], - "phone": ["555-1234", "555-5678", "555-9012", "555-3456", "555-7890"] - }, - "right_data": { - "id": [101, 102, 103, 104, 105], - "full_name": ["John Smith", "Jane Doe", "Robert Johnson", "Alice B.", "Dave Miller"], - "contact_email": ["john.smith@example.com", "janedoe@example.com", "bob.johnson@example.com", - "alice@different.com", "d.miller@example.com"], - "contact_phone": ["555-1234", "555-5678", "555-9999", "555-3456", "555-7890"] - } - } - - # Case 5: Null values in key columns - edge_case_data["null_values"] = { - "left_data": { - "id": [1, 2, 3, 4, 5], - "company_name": ["Company A", None, "Company C", "Company D", "Company E"], - "address": ["Address 1", "Address 2", None, "Address 4", "Address 5"] - }, - "right_data": { - "id": [101, 102, 103, 104, 105], - "organization": ["Company A", "Company B", "Company C", None, "Company E"], - "location": ["Address 1", "Address 2", "Address 3", "Address 4", None] - } - } - - return edge_case_data - - -# Create LazyFrames from raw data -def create_lazy_frames(data: Dict[str, List[Any]]) -> pl.LazyFrame: - """ - Creates a LazyFrame from dictionary data. - - Args: - data: Dictionary with column names as keys and data as values - - Returns: - pl.LazyFrame - """ - return pl.DataFrame(data).lazy() - - -# Create FuzzyMappings -def create_fuzzy_mappings( - left_col: str, - right_col: str, - fuzzy_type: str = "jaro_winkler", - threshold_score: float = 20.0, - perc_unique: float = 1.0 -) -> FuzzyMapping: - """ - Creates a FuzzyMapping object. - - Args: - left_col: Column name in left dataframe - right_col: Column name in right dataframe - fuzzy_type: Type of fuzzy matching algorithm - threshold_score: Threshold score for fuzzy matching (0-100) - perc_unique: Percentage uniqueness factor - - Returns: - FuzzyMapping object - """ - return FuzzyMapping( - left_col=left_col, - right_col=right_col, - fuzzy_type=fuzzy_type, - threshold_score=threshold_score, - perc_unique=perc_unique - ) - - -def create_fuzzy_maps(): - # Create the fuzzy mappings - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization", - fuzzy_type="levenshtein", - threshold_score=80.0, # 20% threshold corresponds to 0.8 reversed (80% similarity) - perc_unique=1.0 - ), - create_fuzzy_mappings( - left_col="address", - right_col="location", - fuzzy_type="levenshtein", - threshold_score=80.0, # 20% threshold corresponds to 0.8 reversed (80% similarity) - perc_unique=1.2 - ), - create_fuzzy_mappings( - left_col="country", - right_col="country_code", - fuzzy_type="jaro_winkler", - threshold_score=90.0, # Higher threshold for country codes as they should be more exact - perc_unique=0.5 # Lower uniqueness factor since countries are not very unique - ) - ] - return fuzzy_mappings - - -# Combined utility functions (for backward compatibility) -def create_test_data( - size: int = 10000, - match_rate: float = 0.7, - error_rate: float = 0.2 -) -> Tuple[pl.LazyFrame, pl.LazyFrame, List[FuzzyMapping]]: - """ - Creates large-scale test data for performance testing. - - Args: - size: Number of rows in each dataframe - match_rate: Proportion of rows that should have a match - error_rate: Rate of typos in matching fields - - Returns: - Tuple of (left_df, right_df, fuzzy_mappings) - """ - # Generate the data - data = generate_large_scale_data(size, match_rate, error_rate) - - # Create the LazyFrames - left_df = create_lazy_frames(data["left_data"]) - right_df = create_lazy_frames(data["right_data"]) - - return left_df, right_df, create_fuzzy_maps() - - -def create_edge_case_test_data() -> Dict[str, Tuple[pl.LazyFrame, pl.LazyFrame, List[FuzzyMapping]]]: - """ - Creates a dictionary of edge case test scenarios. - - Returns: - Dictionary mapping scenario name to (left_df, right_df, fuzzy_mappings) - """ - # Generate the edge case data - edge_case_data = generate_edge_case_data() - - # Process each edge case - edge_cases = {} - for case_name, data in edge_case_data.items(): - # Create the LazyFrames - left_df = create_lazy_frames(data["left_data"]) - right_df = create_lazy_frames(data["right_data"]) - - # Create appropriate fuzzy mappings based on the case - if case_name == "empty": - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization", - threshold_score=20.0 - ) - ] - elif case_name == "one_to_many" or case_name == "many_to_one": - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization", - threshold_score=30.0 - ) - ] - elif case_name == "multi_criteria": - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="name", - right_col="full_name", - ), - create_fuzzy_mappings( - left_col="email", - right_col="contact_email", - fuzzy_type="levenshtein", - threshold_score=30.0, - ), - create_fuzzy_mappings( - left_col="phone", - right_col="contact_phone", - fuzzy_type="exact", - threshold_score=0.0, - ) - ] - elif case_name == "null_values": - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization", - threshold_score=20.0 - ) - ] - else: - fuzzy_mappings = [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization" - ) - ] - - edge_cases[case_name] = (left_df, right_df, fuzzy_mappings) - - return edge_cases - - -def generate_small_fuzzy_test_data_left() -> pl.DataFrame: - """ - Generates a small, predictable test dataset with data designed for fuzzy matching challenges. - - Returns: - LazyFrame with left side test data - """ - return pl.DataFrame({ - "id": [1, 2, 3, 4, 5], - "company_name": ["Apple Inc.", "Microsft", "Amazon", "Gogle", "Facebok"], - "address": ["1 Apple Park", "One Microsoft Way", "410 Terry Ave N", "1600 Amphitheatre", "1 Hacker Way"], - "contact": ["Tim Cook", "Satya Ndella", "Andy Jessy", "Sundar Pichai", "Mark Zukerberg"] - }) - - -def generate_small_fuzzy_test_data_right() -> pl.DataFrame: - """ - Generates a small, predictable test dataset with variations for fuzzy matching. - - Returns: - LazyFrame with right side test data - """ - return pl.DataFrame({ - "id": [101, 102, 103, 104, 105], - "organization": ["Apple Incorporated", "Microsoft Corp", "Amazon.com Inc", "Google LLC", "Facebook Inc"], - "location": ["Apple Park, Cupertino", "Microsoft Way, Redmond", "Terry Ave North, Seattle", - "Amphitheatre Pkwy, Mountain View", "Hacker Way, Menlo Park"], - "ceo": ["Timothy Cook", "Satya Nadella", "Andy Jassy", "Sundar Pichai", "Mark Zuckerberg"] - }) - - -def generate_small_fuzzy_test_mappings() -> List[FuzzyMapping]: - """ - Creates fuzzy mappings for the small test dataset. - - Returns: - List of FuzzyMapping objects - """ - return [ - create_fuzzy_mappings( - left_col="company_name", - right_col="organization", - fuzzy_type="jaro_winkler", - threshold_score=20.0 - ), - create_fuzzy_mappings( - left_col="contact", - right_col="ceo", - fuzzy_type="levenshtein", - threshold_score=30.0 - ) - ] - - -def generate_small_fuzzy_test_data() -> Tuple[pl.DataFrame, pl.DataFrame, List[FuzzyMapping]]: - """ - Generates small test data for fuzzy matching. - """ - - left_df = generate_small_fuzzy_test_data_left() - right_df = generate_small_fuzzy_test_data_right() - fuzzy_mappings = generate_small_fuzzy_test_mappings() - return left_df, right_df, fuzzy_mappings - - -def create_deterministic_test_data(size=20): - """ - Creates deterministic test data with guaranteed unique values for cross join testing. - - Parameters: - ----------- - size : int - The number of rows in each dataframe - - Returns: - -------- - Tuple[pl.LazyFrame, pl.LazyFrame, List[FuzzyMapping]] - A tuple containing left dataframe, right dataframe, and fuzzy mappings - """ - import polars as pl - from flowfile_worker.polars_fuzzy_match.models import FuzzyMapping - - # Create deterministic data with unique values - left_data = { - "id": list(range(1, size + 1)), - "company_name": [f"Company_{i}" for i in range(1, size + 1)], - "address": [f"Address_{i}" for i in range(1, size + 1)], - "country": [f"Country_{i % 5}" for i in range(1, size + 1)] - } - - right_data = { - "id": list(range(101, size + 101)), - "organization": [f"Organization_{i}" for i in range(1, size + 1)], - "location": [f"Location_{i}" for i in range(1, size + 1)], - "country_code": [f"Code_{i % 5}" for i in range(1, size + 1)] - } - - # Create the LazyFrames - left_df = pl.DataFrame(left_data).lazy() - right_df = pl.DataFrame(right_data).lazy() - - # Create fuzzy mappings - fuzzy_mappings = [ - FuzzyMapping( - left_col="company_name", - right_col="organization", - fuzzy_type="levenshtein", - threshold_score=80.0, - perc_unique=1.0 - ), - FuzzyMapping( - left_col="address", - right_col="location", - fuzzy_type="levenshtein", - threshold_score=80.0, - perc_unique=1.2 - ), - FuzzyMapping( - left_col="country", - right_col="country_code", - fuzzy_type="jaro_winkler", - threshold_score=90.0, - perc_unique=0.5 - ) - ] - - return left_df, right_df, fuzzy_mappings diff --git a/flowfile_worker/tests/polars_fuzzy_match/test_matcher.py b/flowfile_worker/tests/polars_fuzzy_match/test_matcher.py deleted file mode 100644 index 827ab92b..00000000 --- a/flowfile_worker/tests/polars_fuzzy_match/test_matcher.py +++ /dev/null @@ -1,487 +0,0 @@ -import sys -import os -import tempfile -from typing import List - -# Set up import paths -try: - sys.path.append(os.path.dirname(os.path.abspath(__file__))) -except NameError: - sys.path.append(os.path.dirname(os.path.abspath('flowfile_worker/tests/polars_fuzzy_match/test_matcher.py'))) - -from match_utils import (generate_small_fuzzy_test_data_left, create_deterministic_test_data, - create_test_data, generate_small_fuzzy_test_data) - -import polars as pl -import pytest -import logging - -from flowfile_worker.polars_fuzzy_match.process import process_fuzzy_frames -from flowfile_worker.polars_fuzzy_match.pre_process import pre_process_for_fuzzy_matching - -# Import functions to test -from flowfile_worker.polars_fuzzy_match.matcher import ( - cross_join_large_files, - cross_join_small_files, - cross_join_filter_existing_fuzzy_results, - cross_join_no_existing_fuzzy_results, - unique_df_large, - combine_matches, - add_index_column, - fuzzy_match_dfs, - process_fuzzy_mapping, - perform_all_fuzzy_matches, - ensure_left_is_larger, - split_dataframe - -) - - -@pytest.fixture -def sample_dataframe(): - """Create a sample DataFrame for testing.""" - data = { - "name": ["John", "Alice", "Bob", "Charlie"], - "age": [30, 25, 35, 40], - "city": ["New York", "Boston", "Chicago", "Seattle"] - } - return pl.DataFrame(data).lazy() - - -@pytest.fixture -def flow_logger(): - return logging.getLogger('sample') - - -@pytest.fixture -def temp_directory(): - """Create a real temporary directory that will be cleaned up after the test.""" - with tempfile.TemporaryDirectory() as temp_dir: - print(f"Created temporary directory: {temp_dir}") - yield temp_dir - print("Temporary directory cleaned up") - - -def test_add_index_column(sample_dataframe, temp_directory): - """Test the add_index_column function.""" - # Use a real temporary directory - left_df, _, _ = create_test_data() - - result_df = add_index_column(left_df, '__test_index', temp_directory) - logging.info(f"Result columns: {result_df.columns}") - assert result_df is not None - - direct_df = left_df.with_row_index(name='__test_index').collect() - assert '__test_index' in direct_df.columns - assert list(direct_df['__test_index']) == list(range(direct_df.shape[0])) - - -def test_cross_join_small_files(temp_directory): - """Test the cross_join_small_files function.""" - left_df, right_df, mapping = create_test_data(10) - - left_col_name = mapping[0].left_col - right_col_name = mapping[0].right_col - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - (left_fuzzy_frame, - right_fuzzy_frame, - left_col_name, - right_col_name, - len_left_df, - len_right_df) = process_fuzzy_frames( - left_df=left_df, - right_df=right_df, - left_col_name=left_col_name, - right_col_name=right_col_name, - temp_dir_ref=temp_directory - ) - - result_df = cross_join_small_files(left_fuzzy_frame, right_fuzzy_frame).collect() - assert result_df.select(pl.len())[0, 0] == len_left_df * len_right_df - assert set(result_df.columns) == {'company_name', '__left_index', 'organization', - '__right_index'}, 'Unexpected columns' - - -def create_test_dir(): - return tempfile.TemporaryDirectory() - -def test_cross_join_large_files(temp_directory, flow_logger): - """Test the cross_join_large_files function.""" - left_df, right_df, mapping = create_test_data(10_000) # Smaller size for test speed - - left_col_name = mapping[0].left_col - right_col_name = mapping[0].right_col - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - (left_fuzzy_frame, - right_fuzzy_frame, - left_col_name, - right_col_name, - len_left_df, - len_right_df) = process_fuzzy_frames( - left_df=left_df, - right_df=right_df, - left_col_name=left_col_name, - right_col_name=right_col_name, - temp_dir_ref=temp_directory - ) - - logging.info(f"Left columns: {left_fuzzy_frame.columns}") - logging.info(f"Right columns: {right_fuzzy_frame.columns}") - - result_df = cross_join_large_files(left_fuzzy_frame, right_fuzzy_frame, left_col_name, right_col_name, - flow_logger).collect() - - logging.info(f"Result columns: {result_df.columns}") - assert result_df.select(pl.len())[0, 0] > 0 # Should return some rows - assert result_df.select(pl.len())[0, 0] < len_left_df * len_right_df - assert set(result_df.columns) == {'company_name', '__left_index', 'organization', - '__right_index'}, 'Unexpected columns' - - -def test_cross_join_filter_existing_fuzzy_results(temp_directory): - """Test cross_join_filter_existing_fuzzy_results function.""" - left_df, right_df, mapping = create_test_data(20) - - left_col_name = mapping[0].left_col - right_col_name = mapping[0].right_col - - # Add index columns - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - # Create specific existing matches with a deliberate pattern - # Using indices that aren't sequential to ensure the function is properly filtering - existing_matches = pl.DataFrame({ - "__left_index": [0, 1, 2, 3], - "__right_index": [4, 3, 2, 1] - }, schema=[('__left_index', pl.UInt32), ('__right_index', pl.UInt32)]).lazy() - - # Before running the filter, verify we have our source data - left_collected = left_df.collect() - right_collected = right_df.collect() - - # Run the filter function - result_df = cross_join_filter_existing_fuzzy_results( - left_df, - right_df, - existing_matches, - left_col_name, - right_col_name - ).collect() - - # Verify results - assert "__left_index" in result_df.columns - assert "__right_index" in result_df.columns - assert left_col_name in result_df.columns - assert right_col_name in result_df.columns - - # Verify that the function correctly filtered on the existing matches - # The result should include only the mapping pairs that were in existing_matches - existing_pairs = list(zip(existing_matches.collect()["__left_index"].to_list(), - existing_matches.collect()["__right_index"].to_list())) - - result_pairs = [] - for row in result_df.iter_rows(named=True): - left_indices = row["__left_index"] - right_indices = row["__right_index"] - - # Handle both scalar and list types - if isinstance(left_indices, list) and isinstance(right_indices, list): - for left_idx in left_indices: - for right_idx in right_indices: - result_pairs.append((left_idx, right_idx)) - else: - result_pairs.append((left_indices, right_indices)) - - # Check that all result pairs correspond to existing matches - for left_idx, right_idx in result_pairs: - assert (left_idx, right_idx) in existing_pairs, f"Pair ({left_idx}, {right_idx}) not in existing matches" - - # Verify we have the expected number of matches - assert len(result_df) == len( - existing_matches.collect()), "Result should have same number of rows as existing matches" - - -def test_cross_join_no_existing_fuzzy_results(temp_directory, flow_logger): - """Test cross_join_no_existing_fuzzy_results function.""" - left_df, right_df, mapping = create_deterministic_test_data(20) - - left_col_name = mapping[0].left_col - right_col_name = mapping[0].right_col - - # Add index columns - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - # Run the function - result_df = cross_join_no_existing_fuzzy_results( - left_df, - right_df, - left_col_name, - right_col_name, - temp_directory, - flow_logger - ).collect() - - # Verify results - assert result_df is not None - assert result_df.shape[0] > 0 - assert result_df.select(pl.len())[0, 0] == left_df.select(pl.len()).collect()[0, 0] * \ - right_df.select(pl.len()).collect()[0, 0] - - -def test_process_fuzzy_mapping_no_existing_matches(temp_directory, flow_logger): - left_df, right_df, mapping = create_test_data(20) - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - fuzzy_map = mapping[0] - - result, _ = process_fuzzy_mapping(fuzzy_map=fuzzy_map, - left_df=left_df, - right_df=right_df, - existing_matches=None, - local_temp_dir_ref=temp_directory, - i=1, - flowfile_logger=flow_logger) - test_result = (result.join(left_df, on='__left_index') - .join(right_df, on='__right_index') - .select(["company_name", "organization", "fuzzy_score_1"]).collect()) - result = result.collect() - - # Assert that the result contains the expected columns - assert '__left_index' in result.columns - assert '__right_index' in result.columns - assert 'fuzzy_score_1' in result.columns - - # Verify result is not empty - assert result.shape[0] > 0 - - # Check that fuzzy scores are within expected range (0-100) - assert all(0 <= score <= 1 for score in result['fuzzy_score_1']) - - # Verify that the test_result has matched columns and reasonable values - assert test_result.shape[0] > 0 - assert all(isinstance(company, str) for company in test_result['company_name']) - assert all(isinstance(org, str) for org in test_result['organization']) - - # Check that high fuzzy scores correspond to similar strings - for row in test_result.iter_rows(named=True): - company = row['company_name'] - org = row['organization'] - score = row['fuzzy_score_1'] - - # If score is high (above threshold), company and org should be similar - if score >= fuzzy_map.threshold_score / 100: - # Basic similarity check - at least sharing the same prefix - assert len(company) > 0 and len(org) > 0 - - # For exact matches, the score should be very high - if company == org: - assert score == 1 # Expect very high scores for exact matches - - -def test_process_fuzzy_multiple_mappings(temp_directory, flow_logger): - left_df, right_df, mapping = create_test_data(50_000) - - left_df, right_df, mapping = pre_process_for_fuzzy_matching(left_df, right_df, mapping, flow_logger) - - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - first_result, n_matches = process_fuzzy_mapping(fuzzy_map=mapping[0], - left_df=left_df, - right_df=right_df, - existing_matches=None, - local_temp_dir_ref=temp_directory, - i=1, - flowfile_logger=flow_logger, - existing_number_of_matches=None) - - second_result, n_matches = process_fuzzy_mapping(fuzzy_map=mapping[1], - left_df=left_df, - right_df=right_df, - existing_matches=first_result, - local_temp_dir_ref=temp_directory, - i=2, - flowfile_logger=flow_logger, - existing_number_of_matches=n_matches) - - third_result, n_matches = process_fuzzy_mapping(fuzzy_map=mapping[2], - left_df=left_df, - right_df=right_df, - existing_matches=second_result, - local_temp_dir_ref=temp_directory, - i=3, - flowfile_logger=flow_logger, - existing_number_of_matches=n_matches) - - first_count = first_result.select(pl.len()).collect()[0, 0] - second_count = second_result.select(pl.len()).collect()[0, 0] - third_count = third_result.select(pl.len()).collect()[0, 0] - assert first_count >= second_count >= third_count, "Expected decreasing number of matches" - - -def test_perform_all_fuzzy_matches(temp_directory, flow_logger): - left_df, right_df, mapping = create_test_data(10) - - left_df, right_df, mapping = pre_process_for_fuzzy_matching(left_df, right_df, mapping, flow_logger) - left_df = add_index_column(left_df, '__left_index', temp_directory) - right_df = add_index_column(right_df, '__right_index', temp_directory) - - all_matches = perform_all_fuzzy_matches(left_df, right_df, mapping, flow_logger, temp_directory) - assert len(all_matches) == len(mapping), "Expected one result per mapping" - - -def test_fuzzy_match_dfs(flow_logger): - left_df, right_df, mapping = generate_small_fuzzy_test_data() - result = fuzzy_match_dfs(left_df.lazy(), right_df.lazy(), mapping, flow_logger) - result = result.sort('id') - assert result is not None - expected_match_data = pl.DataFrame( - {'id': [1, 2, 3, 4, 5], 'company_name': ['Apple Inc.', 'Microsft', 'Amazon', 'Gogle', 'Facebok'], - 'address': ['1 Apple Park', 'One Microsoft Way', '410 Terry Ave N', '1600 Amphitheatre', '1 Hacker Way'], - 'contact': ['Tim Cook', 'Satya Ndella', 'Andy Jessy', 'Sundar Pichai', 'Mark Zukerberg'], - 'fuzzy_score_0': [0.88, 0.9142857142857143, 0.8857142857142858, 0.8666666666666667, 0.9166666666666667], - 'fuzzy_score_1': [0.6666666666666667, 0.9230769230769231, 0.9, 1.0, 0.9333333333333333], - 'id_right': [101, 102, 103, 104, 105], - 'organization': ['Apple Incorporated', 'Microsoft Corp', 'Amazon.com Inc', 'Google LLC', 'Facebook Inc'], - 'location': ['Apple Park, Cupertino', 'Microsoft Way, Redmond', 'Terry Ave North, Seattle', - 'Amphitheatre Pkwy, Mountain View', 'Hacker Way, Menlo Park'], - 'ceo': ['Timothy Cook', 'Satya Nadella', 'Andy Jassy', 'Sundar Pichai', 'Mark Zuckerberg']} - - ) - assert result.equals(expected_match_data), "Unexpected match data" - - -def test_unique_df_large(temp_directory): - """Test the unique_df_large function for handling large dataframes with duplicates.""" - # Create a sample dataframe with intentional duplicates - data = { - "category": ["A", "A", "B", "B", "C"] * 20, # Categories with repetition - "value": [1, 1, 2, 2, 3] * 20, # Values with repetition - "id": list(range(100)) # Unique IDs to make rows distinct - } - df = pl.DataFrame(data) - - # Test with columns specified - result_with_cols = unique_df_large(df, cols=["category", "value"]) - - # Verify the results - assert result_with_cols.shape[0] == 3, "Expected 3 unique combinations of category and value" - assert set(result_with_cols["category"].to_list()) == {"A", "B", "C"}, "Unexpected categories" - assert set(result_with_cols["value"].to_list()) == {1, 2, 3}, "Unexpected values" - - # Test with default columns (all columns) - result_all_cols = unique_df_large(df) - - # Since we have unique IDs, each row should be unique when considering all columns - assert result_all_cols.shape[0] == 100, "Expected 100 unique rows when considering all columns" - - -def test_combine_matches(temp_directory): - """Test the combine_matches function for merging multiple match datasets.""" - # Create sample matching dataframes - match1 = pl.DataFrame({ - "__left_index": [0, 1, 2, 3], - "__right_index": [5, 6, 7, 8], - "fuzzy_score_0": [0.9, 0.8, 0.7, 0.6] - }).lazy() - - match2 = pl.DataFrame({ - "__left_index": [0, 1, 2, 3], - "__right_index": [5, 6, 7, 8], - "fuzzy_score_1": [0.85, 0.75, 0.65, 0.55] - }).lazy() - - match3 = pl.DataFrame({ - "__left_index": [0, 1], # Subset of matches to test joining behavior - "__right_index": [5, 6], - "fuzzy_score_2": [0.95, 0.92] - }).lazy() - - # Test combining all matches - matching_dfs = [match1, match2, match3] - result = combine_matches(matching_dfs).collect() - - # Verify structure and content - assert result.shape[0] == 2, "Expected 2 rows after combining (limited by match3)" - assert set(result.columns) == {"__left_index", "__right_index", "fuzzy_score_0", "fuzzy_score_1", - "fuzzy_score_2"}, "Unexpected columns" - - # Verify values for specific matches - first_match = result.filter(pl.col("__left_index") == 0).select( - ["fuzzy_score_0", "fuzzy_score_1", "fuzzy_score_2"]).row(0) - assert first_match == (0.9, 0.85, 0.95), "Unexpected scores for first match" - - # Test with empty list - with pytest.raises(IndexError): - combine_matches([]) - - # Test with single match dataframe - single_result = combine_matches([match1]).collect() - assert single_result.shape[0] == 4, "Expected 4 rows from single match" - assert set(single_result.columns) == {"__left_index", "__right_index", - "fuzzy_score_0"}, "Unexpected columns with single match" - - -def test_ensure_left_is_larger(): - """Test the ensure_left_is_larger function to verify it correctly swaps dataframes when necessary.""" - # Create test data with left larger than right - left_larger_df = pl.DataFrame({ - "id": list(range(20)), - "value": ["test"] * 20 - }) - right_smaller_df = pl.DataFrame({ - "id": list(range(10)), - "value": ["test"] * 10 - }) - - # Create test data with right larger than left - left_smaller_df = pl.DataFrame({ - "id": list(range(5)), - "value": ["test"] * 5 - }) - right_larger_df = pl.DataFrame({ - "id": list(range(15)), - "value": ["test"] * 15 - }) - - # Test case where left is already larger - result_df1, result_df2, result_col1, result_col2 = ensure_left_is_larger( - left_larger_df, right_smaller_df, "left_col", "right_col" - ) - - # Verify no swap occurred - assert result_df1.select(pl.len())[0, 0] == 20, "Left dataframe should still have 20 rows" - assert result_df2.select(pl.len())[0, 0] == 10, "Right dataframe should still have 10 rows" - assert result_col1 == "left_col", "Left column name should remain unchanged" - assert result_col2 == "right_col", "Right column name should remain unchanged" - - # Test case where right is larger and should be swapped - result_df1, result_df2, result_col1, result_col2 = ensure_left_is_larger( - left_smaller_df, right_larger_df, "left_col", "right_col" - ) - - # Verify swap occurred correctly - assert result_df1.select(pl.len())[0, 0] == 15, "Left dataframe should now have 15 rows (was right)" - assert result_df2.select(pl.len())[0, 0] == 5, "Right dataframe should now have 5 rows (was left)" - assert result_col1 == "right_col", "Left column name should now be right_col" - assert result_col2 == "left_col", "Right column name should now be left_col" - - -def test_split_single_df(): - left_df, right_df, fuzzy_map = create_test_data(100_000) - all_dfs = split_dataframe(left_df.collect(), 100_000) - assert len(all_dfs) == 1, "Expected a single dataframe with all rows" - - -def test_split_dataframe(): - left_df, right_df, fuzzy_map = create_test_data(100_000) - all_dfs = split_dataframe(left_df.collect(), 10_000) - assert len(all_dfs) == 10, "Expected 10 dataframes with 10,000 rows each" - assert all(len(df) == 10_000 for df in all_dfs), "Expected all dataframes to have 10,000 rows" diff --git a/flowfile_worker/tests/polars_fuzzy_match/test_pre_process.py b/flowfile_worker/tests/polars_fuzzy_match/test_pre_process.py deleted file mode 100644 index d6d263f2..00000000 --- a/flowfile_worker/tests/polars_fuzzy_match/test_pre_process.py +++ /dev/null @@ -1,137 +0,0 @@ -import sys -import os -import tempfile -from typing import List - -# Set up import paths -try: - sys.path.append(os.path.dirname(os.path.abspath(__file__))) -except NameError: - sys.path.append(os.path.dirname(os.path.abspath('flowfile_worker/tests/polars_fuzzy_match/test_matcher.py'))) - -from match_utils import (generate_small_fuzzy_test_data_left, generate_small_fuzzy_test_data_right, - create_test_data, create_fuzzy_maps) - -import polars as pl -import pytest -import logging - -from flowfile_worker.polars_fuzzy_match.process import process_fuzzy_frames - -# Import functions to test -from flowfile_worker.polars_fuzzy_match.pre_process import ( - get_approx_uniqueness, - calculate_uniqueness, - calculate_df_len, - fill_perc_unique_in_fuzzy_maps, - determine_order_of_fuzzy_maps, - calculate_uniqueness_rate, - determine_need_for_aggregation, - aggregate_output, - pre_process_for_fuzzy_matching -) - - -@pytest.fixture -def sample_dataframe(): - """Create a sample DataFrame for testing.""" - data = { - "name": ["John", "Alice", "Bob", "Charlie"], - "age": [30, 25, 35, 40], - "city": ["New York", "Boston", "Chicago", "Seattle"], - 'country': ['USA', 'USA', 'USA', 'Canada'] - } - return pl.DataFrame(data).lazy() - - -@pytest.fixture -def flow_logger(): - return logging.getLogger('sample') - -@pytest.fixture -def temp_directory(): - """Create a real temporary directory that will be cleaned up after the test.""" - with tempfile.TemporaryDirectory() as temp_dir: - print(f"Created temporary directory: {temp_dir}") - yield temp_dir - print("Temporary directory cleaned up") - - -def test_get_approx_uniqueness(sample_dataframe): - uniqueness = get_approx_uniqueness(sample_dataframe) - assert uniqueness == {'name': 4, 'age': 4, 'city': 4, 'country': 2} - - -def test_calculate_uniqueness(): - assert calculate_uniqueness(0.5, 0.5) == 0.75 - assert calculate_uniqueness(0.6, 0.8) == 1.3000000000000003 - assert calculate_uniqueness(0.6, 0.5) == 0.905 - assert calculate_uniqueness(0.1, 0.3) == 0.35 - - -def test_calculate_df_len(sample_dataframe): - assert calculate_df_len(sample_dataframe) == 4 - - -def test_fill_perc_unique_in_fuzzy_maps(sample_dataframe, flow_logger): - left_df, right_df, fuzzy_maps = create_test_data() - left_df_len = calculate_df_len(left_df) - right_df_len = calculate_df_len(right_df) - fuzzy_maps = fill_perc_unique_in_fuzzy_maps(left_df, right_df, fuzzy_maps, flow_logger, left_df_len, right_df_len) - fuzzy_map_1 = fuzzy_maps[0] - fuzzy_map_2 = fuzzy_maps[1] - fuzzy_map_3 = fuzzy_maps[2] - assert fuzzy_map_3.perc_unique < 1 - assert fuzzy_map_2.perc_unique > 1 - assert fuzzy_map_1.perc_unique > 1 - - -def test_determine_order_of_fuzzy_maps(): - fuzzy_maps = create_fuzzy_maps() - - fuzzy_maps = determine_order_of_fuzzy_maps(fuzzy_maps) - assert fuzzy_maps[0].perc_unique > fuzzy_maps[1].perc_unique > fuzzy_maps[2].perc_unique - - -def test_calculate_uniqueness_rate(): - fuzzy_maps = create_fuzzy_maps() - uniqueness_score_full = calculate_uniqueness_rate(fuzzy_maps) - uniqueness_score_first_two = calculate_uniqueness_rate(fuzzy_maps[:2]) - uniqueness_score_last_one = calculate_uniqueness_rate(fuzzy_maps[2:]) - assert uniqueness_score_full > 1.2 - assert uniqueness_score_last_one < 1.2 - assert uniqueness_score_first_two < uniqueness_score_full - assert uniqueness_score_last_one < uniqueness_score_full - - -def test_determine_need_for_aggregation(): - uniqueness_score = 0.5 - assert determine_need_for_aggregation(uniqueness_score, 1_0200_000) is True - assert determine_need_for_aggregation(uniqueness_score, 1_000_000) is False - - -def test_aggregate_output(): - left_df, right_df, fuzzy_maps = create_test_data() - left_df_unique, right_df_unique = aggregate_output(left_df, right_df, fuzzy_maps[2:]) - left_n_vals = left_df.select(fuzzy_maps[2].left_col).unique().select(pl.len()).collect()[0, 0] - right_n_vals = right_df.select(fuzzy_maps[2].right_col).unique().select(pl.len()).collect()[0, 0] - assert left_df_unique.select(pl.len()).collect()[0, 0] == left_n_vals - assert right_df_unique.select(pl.len()).collect()[0, 0] == right_n_vals - - -def test_process_fuzzy_mapping_no_uniqueness(flow_logger): - left_df, right_df, mapping = create_test_data(100000) - mapping = mapping[2:] - - left_df_prep, right_df_prep, mapping = pre_process_for_fuzzy_matching(left_df, right_df, mapping, flow_logger) - assert left_df_prep.collect().shape[0] == left_df.select(mapping[0].left_col).unique().collect().shape[0] - assert right_df_prep.collect().shape[0] == right_df.select(mapping[0].right_col).unique().collect().shape[0] - assert left_df_prep.collect().shape[0] < left_df.select(mapping[0].left_col).collect().shape[0] - assert right_df_prep.collect().shape[0] < right_df.select(mapping[0].right_col).collect().shape[0] - -def test_process_fuzzy_mapping_uniqueness(flow_logger): - left_df, right_df, mapping = create_test_data(100000) - left_df_prep, right_df_prep, mapping = pre_process_for_fuzzy_matching(left_df, right_df, mapping, flow_logger) - assert left_df_prep.collect().shape[0] == left_df.select(pl.first()).collect().shape[0] - assert right_df_prep.collect().shape[0] == right_df.select(pl.first()).collect().shape[0] - diff --git a/flowfile_worker/tests/polars_fuzzy_match/test_process.py b/flowfile_worker/tests/polars_fuzzy_match/test_process.py deleted file mode 100644 index fdc9bfd9..00000000 --- a/flowfile_worker/tests/polars_fuzzy_match/test_process.py +++ /dev/null @@ -1,254 +0,0 @@ -import sys -import os -from typing import List - -# Set up import paths -try: - sys.path.append(os.path.dirname(os.path.abspath(__file__))) -except NameError: - sys.path.append(os.path.dirname(os.path.abspath('flowfile_worker/tests/polars_fuzzy_match/test_process.py'))) - -import polars as pl -import pytest -from match_utils import generate_small_fuzzy_test_data_left, generate_small_fuzzy_test_data_right -from flowfile_worker.polars_fuzzy_match.process import ( - calculate_fuzzy_score, - calculate_and_parse_fuzzy, - process_fuzzy_frames -) -from flowfile_worker.polars_fuzzy_match.models import FuzzyTypeLiteral - -# Test configuration -FUZZY_TYPES: List[FuzzyTypeLiteral] = [ - 'levenshtein', 'jaro', 'jaro_winkler', - 'hamming', 'damerau_levenshtein', 'indel' -] -THRESHOLD_VALUES: List[float] = [0.3, 0.5, 0.7, 0.9] - - -@pytest.fixture(autouse=True) -def no_caching(monkeypatch): - """ - Disable caching for all tests by mocking cache-related functions. - """ - - def _cache_polars_frame_to_temp(df, tempdir=None): - print("Global caching disabled") - return df.lazy() if isinstance(df, pl.DataFrame) else df - - try: - # Import and patch utils module - import flowfile_worker.polars_fuzzy_match.utils - monkeypatch.setattr( - flowfile_worker.polars_fuzzy_match.utils, - 'cache_polars_frame_to_temp', - _cache_polars_frame_to_temp - ) - monkeypatch.setattr( - flowfile_worker.polars_fuzzy_match.utils, - 'write_polars_frame', - lambda df, path: True - ) - except (ImportError, AttributeError) as e: - print(f"Warning: Unable to patch caching functions: {e}") - - -# Test fixtures -@pytest.fixture -def small_test_data() -> pl.LazyFrame: - """Create a small dataset for fuzzy matching tests.""" - test_data = { - "left_name": ["John", "Johan", "Johannes", "Edward", "Edwin", "Smith", "Simpson", "Thompson"], - "right_name": ["Johny", "Doris", "John", "Eduward", "Edwin", "Smyth", "Simson", "Thomson"] - } - return pl.LazyFrame(test_data) - - -@pytest.fixture -def test_data_with_indices() -> pl.LazyFrame: - """Create test data with left_index and right_index columns.""" - test_data = { - "left_name": ["John", "Johan", "Johannes", "Edward"], - "right_name": ["Johny", "Doris", "John", "Eduward"], - "__left_index": [[1, 2], [3, 4], [5, 6], [7, 8]], - "__right_index": [[10, 20], [30, 40], [50, 60], [70, 80]] - } - return pl.LazyFrame(test_data) - - -@pytest.fixture -def test_data_empty_indices() -> pl.LazyFrame: - """Create test data with some empty indices to test edge cases.""" - test_data = { - "left_name": ["John", "Johan", "Johannes"], - "right_name": ["Johny", "Doris", "John"], - "__left_index": [[], [1, 2], [3]], - "__right_index": [[10], [20, 30], []] - } - return pl.LazyFrame(test_data) - - -# Test functions -@pytest.mark.parametrize("fuzzy_type", FUZZY_TYPES) -@pytest.mark.parametrize("threshold", THRESHOLD_VALUES) -def test_calculate_fuzzy_score_all_types(small_test_data: pl.LazyFrame, fuzzy_type: FuzzyTypeLiteral, threshold: float): - """Test all fuzzy matching algorithms with various thresholds.""" - result_df = calculate_fuzzy_score( - small_test_data, - "left_name", - "right_name", - fuzzy_type, - threshold - ).collect() - - # Basic validation - assert not result_df.is_empty() - assert "s" in result_df.columns - - # Validate score ranges - score_col = result_df["s"] - assert score_col.min() >= 0.0 - assert score_col.max() <= 1.0 - - # Validate threshold filtering - if threshold > 0: - filtered_scores = score_col.filter(score_col >= threshold) - assert filtered_scores.min() >= threshold - - -def test_calculate_and_parse_fuzzy(test_data_with_indices): - """Test the calculate_and_parse_fuzzy function.""" - fuzzy_method: FuzzyTypeLiteral = "levenshtein" - threshold = 0.2 - - # Expected result - expected_data = pl.DataFrame( - {'s': [0.8, 0.8, 0.8, 0.8, 0.8571428571428572, 0.8571428571428572, 0.8571428571428572, 0.8571428571428572], - '__left_index': [1, 1, 2, 2, 7, 7, 8, 8], - '__right_index': [10, 20, 10, 20, 70, 80, 70, 80]} - ) - - # Execute function and collect results - result_df = calculate_and_parse_fuzzy( - test_data_with_indices, - "left_name", - "right_name", - fuzzy_method, - threshold - ).collect() - - # Basic validation - assert not result_df.is_empty() - assert "s" in result_df.columns - assert "__left_index" in result_df.columns - assert "__right_index" in result_df.columns - - # Check explode worked properly - for row in result_df.iter_rows(named=True): - assert isinstance(row["__left_index"], (int, float)) - assert isinstance(row["__right_index"], (int, float)) - - # Validate score ranges - score_col = result_df["s"] - assert score_col.min() >= 0.8 - assert score_col.max() <= 1.0 - assert all(score >= threshold for score in score_col) - - # Verify matches expected data - assert len(result_df) >= 1 - assert result_df.equals(expected_data, null_equal=True) - - -def test_calculate_and_parse_fuzzy_empty_indices(test_data_empty_indices): - """Test calculate_and_parse_fuzzy with some empty indices.""" - fuzzy_method: FuzzyTypeLiteral = "levenshtein" - threshold = 0.2 - - # Execute function and collect results - result_df = calculate_and_parse_fuzzy( - test_data_empty_indices, - "left_name", - "right_name", - fuzzy_method, - threshold - ).collect() - - # Basic validation - assert "s" in result_df.columns - assert "__left_index" in result_df.columns - assert "__right_index" in result_df.columns - - # Check score ranges for any remaining rows - if not result_df.is_empty(): - score_col = result_df["s"] - assert score_col.min() >= 0.8 - assert score_col.max() <= 1.0 - assert all(score >= threshold for score in score_col) - - -def test_process_fuzzy_frames(monkeypatch): - """Test the process_fuzzy_frames function.""" - # Import and patch modules directly for this test - from flowfile_worker.polars_fuzzy_match import process - - # Create and apply mock functions - def mock_cache_polars_frame_to_temp(df, tempdir=None): - print("Mock cache_polars_frame_to_temp called") - return df - - def mock_collect_lazy_frame(df): - print("Mock collect_lazy_frame called") - return df.collect() - - monkeypatch.setattr(process, 'cache_polars_frame_to_temp', mock_cache_polars_frame_to_temp) - monkeypatch.setattr(process, 'collect_lazy_frame', mock_collect_lazy_frame) - - # Prepare test data - temp_dir_ref = "/temp" - left_df = (generate_small_fuzzy_test_data_left().lazy() - .with_columns(pl.col("id") - .map_elements(lambda x: [x], return_dtype=pl.List(pl.Int64)) - .alias("__left_index"))) - right_df = (generate_small_fuzzy_test_data_right().lazy() - .with_columns(pl.col("id") - .map_elements(lambda x: [x], return_dtype=pl.List(pl.Int64)) - .alias("__right_index"))) - - # Define column names - left_col_name = "company_name" - right_col_name = "organization" - - # Execute the function - left_fuzzy_frame, right_fuzzy_frame, result_left_col, result_right_col, len_left, len_right = process_fuzzy_frames( - left_df, - right_df, - left_col_name, - right_col_name, - temp_dir_ref - ) - - # Verify results - assert left_fuzzy_frame is not None - assert right_fuzzy_frame is not None - - # Collect and check content - left_collected = left_fuzzy_frame.collect() - right_collected = right_fuzzy_frame.collect() - - # Verify columns - assert result_left_col in left_collected.columns - assert "__left_index" in left_collected.columns or "__right_index" in left_collected.columns - assert result_right_col in right_collected.columns - assert "__left_index" in right_collected.columns or "__right_index" in right_collected.columns - - # Verify lengths - assert len_left > 0 - assert len_right > 0 - - # Verify swap logic - if len_left < len_right: - raise AssertionError("Expected left frame to be larger than or equal to right frame after possible swap") - - # Check that null values were filtered - assert left_collected.filter(pl.col(result_left_col).is_null()).height == 0 - assert right_collected.filter(pl.col(result_right_col).is_null()).height == 0 \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index f07b0908..4915769f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ cryptography = "^45.0.5" httpx = "^0.28.1" tqdm = "^4.67.1" s3fs = "^2025.7.0" +pl-fuzzy-frame-match = ">=0.4.0" [tool.poetry.scripts]