diff --git a/flowfile_core/flowfile_core/configs/settings.py b/flowfile_core/flowfile_core/configs/settings.py index 2089b79a..19942a5c 100644 --- a/flowfile_core/flowfile_core/configs/settings.py +++ b/flowfile_core/flowfile_core/configs/settings.py @@ -15,10 +15,12 @@ DEFAULT_SERVER_HOST = "0.0.0.0" DEFAULT_SERVER_PORT = 63578 DEFAULT_WORKER_PORT = 63579 -SINGLE_FILE_MODE: bool = os.environ.get("FLOWFILE_SINGLE_FILE_MODE", "0") == "1" +# Single file mode flag, this determines where worker requests are being send to. +SINGLE_FILE_MODE: MutableBool = MutableBool(os.environ.get("FLOWFILE_SINGLE_FILE_MODE", "0") == "1") -OFFLOAD_TO_WORKER = MutableBool(not SINGLE_FILE_MODE) +# Offload to worker flag, this determines if the worker should handle processing tasks. +OFFLOAD_TO_WORKER: MutableBool = MutableBool(os.environ.get("FLOWFILE_OFFLOAD_TO_WORKER", "1") == "1") def parse_args(): diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py index 169be75d..c10c3e78 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py @@ -21,12 +21,12 @@ from flowfile_core.configs import logger from flowfile_core.utils.utils import ensure_similarity_dicts from flowfile_core.configs.flow_logger import NodeLogger -from flowfile_core.configs.settings import OFFLOAD_TO_WORKER from flowfile_core.schemas import ( cloud_storage_schemas, input_schema, transform_schema as transform_schemas ) +from flowfile_core.schemas.schemas import ExecutionLocationsLiteral, get_global_execution_location # Local imports - Flow File Components from flowfile_core.flowfile.flow_data_engine import utils @@ -66,6 +66,7 @@ T = TypeVar('T', pl.DataFrame, pl.LazyFrame) + def _handle_duplication_join_keys(left_df: T, right_df: T, join_input: transform_schemas.JoinInput) -> Tuple[T, T, Dict[str, str]]: """Temporarily renames join keys to avoid conflicts during a join. @@ -1565,7 +1566,7 @@ def __get_sample__(self, n_rows: int = 100, streamable: bool = True) -> "FlowDat return FlowDataEngine(df, number_of_records=len(df), schema=self.schema) def get_sample(self, n_rows: int = 100, random: bool = False, shuffle: bool = False, - seed: int = None) -> "FlowDataEngine": + seed: int = None, execution_location: Optional[ExecutionLocationsLiteral] = None) -> "FlowDataEngine": """Gets a sample of rows from the DataFrame. Args: @@ -1573,11 +1574,10 @@ def get_sample(self, n_rows: int = 100, random: bool = False, shuffle: bool = Fa random: If True, performs random sampling. If False, takes the first n_rows. shuffle: If True (and `random` is True), shuffles the data before sampling. seed: A random seed for reproducibility. - + execution_location: Location which is used to calculate the size of the dataframe Returns: A new `FlowDataEngine` instance containing the sampled data. """ - n_records = min(n_rows, self.get_number_of_records(calculate_in_worker_process=OFFLOAD_TO_WORKER)) logging.info(f'Getting sample of {n_rows} rows') if random: @@ -1585,12 +1585,17 @@ def get_sample(self, n_rows: int = 100, random: bool = False, shuffle: bool = Fa self.collect_external() if self.lazy and shuffle: - sample_df = self.data_frame.collect(engine="streaming" if self._streamable else "auto").sample(n_rows, - seed=seed, - shuffle=shuffle) + sample_df = (self.data_frame.collect(engine="streaming" if self._streamable else "auto") + .sample(n_rows, seed=seed, shuffle=shuffle)) elif shuffle: sample_df = self.data_frame.sample(n_rows, seed=seed, shuffle=shuffle) else: + if execution_location is None: + execution_location = get_global_execution_location() + n_rows = min(n_rows, self.get_number_of_records( + calculate_in_worker_process=execution_location == "remote") + ) + every_n_records = ceil(self.number_of_records / n_rows) sample_df = self.data_frame.gather_every(every_n_records) else: @@ -1598,7 +1603,7 @@ def get_sample(self, n_rows: int = 100, random: bool = False, shuffle: bool = Fa self.collect(n_rows) sample_df = self.data_frame.head(n_rows) - return FlowDataEngine(sample_df, schema=self.schema, number_of_records=n_records) + return FlowDataEngine(sample_df, schema=self.schema) def get_subset(self, n_rows: int = 100) -> "FlowDataEngine": """Gets the first `n_rows` from the DataFrame. @@ -1722,26 +1727,14 @@ def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput, left = self.data_frame.select(left_select).rename(cross_join_input.left_select.rename_table) right = other.data_frame.select(right_select).rename(cross_join_input.right_select.rename_table) - if verify_integrity: - n_records = self.get_number_of_records() * other.get_number_of_records() - if n_records > 1_000_000_000: - raise Exception("Join will result in too many records, ending process") - else: - n_records = -1 - joined_df = left.join(right, how='cross') cols_to_delete_after = [col.new_name for col in cross_join_input.left_select.renames + cross_join_input.left_select.renames if col.join_key and not col.keep and col.is_available] - if verify_integrity: - return FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False, - number_of_records=n_records, streamable=False) - else: - fl = FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False, - number_of_records=0, streamable=False) - return fl + fl = FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False, streamable=False) + return fl def join(self, join_input: transform_schemas.JoinInput, auto_generate_selection: bool, verify_integrity: bool, other: "FlowDataEngine") -> "FlowDataEngine": @@ -1877,7 +1870,7 @@ def assert_equal(self, other: "FlowDataEngine", ordered: bool = True, strict_sch other.number_of_records = -1 other = other.select_columns(self.columns) - if self.get_number_of_records() != other.get_number_of_records(): + if self.get_number_of_records_in_process() != other.get_number_of_records_in_process(): raise Exception('Number of records is not equal') if self.columns != other.columns: @@ -1913,6 +1906,18 @@ def _calculate_number_of_records_in_worker(self) -> int: ).result return number_of_records + def get_number_of_records_in_process(self, force_calculate: bool = False): + """ + Get the number of records in the DataFrame in the local process. + + args: + force_calculate: If True, forces recalculation even if a value is cached. + + Returns: + The total number of records. + """ + return self.get_number_of_records(force_calculate=force_calculate) + def get_number_of_records(self, warn: bool = False, force_calculate: bool = False, calculate_in_worker_process: bool = False) -> int: """Gets the total number of records in the DataFrame. @@ -1932,7 +1937,6 @@ def get_number_of_records(self, warn: bool = False, force_calculate: bool = Fals """ if self.is_future and not self.is_collected: return -1 - calculate_in_worker_process = False if not OFFLOAD_TO_WORKER else calculate_in_worker_process if self.number_of_records is None or self.number_of_records < 0 or force_calculate: if self._number_of_records_callback is not None: self._number_of_records_callback(self) diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py index 7f1b5dc6..1e4fd5d8 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/subprocess_operations/subprocess_operations.py @@ -123,6 +123,8 @@ def results_exists(file_ref: str): return False except requests.RequestException as e: logger.error(f"Failed to check results existence: {str(e)}") + if "Connection refused" in str(e): + logger.info("") return False diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 1b20ad30..b3e4df9d 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -180,7 +180,7 @@ class FlowGraph: start_datetime: datetime = None end_datetime: datetime = None nodes_completed: int = 0 - flow_settings: schemas.FlowSettings = None + _flow_settings: schemas.FlowSettings = None flow_logger: FlowLogger def __init__(self, @@ -204,7 +204,7 @@ def __init__(self, if isinstance(flow_settings, schemas.FlowGraphConfig): flow_settings = schemas.FlowSettings.from_flow_settings_input(flow_settings) - self.flow_settings = flow_settings + self._flow_settings = flow_settings self.uuid = str(uuid1()) self.nodes_completed = 0 self.start_datetime = None @@ -229,7 +229,18 @@ def __init__(self, elif input_flow is not None: self.add_datasource(input_file=input_flow) - skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) + @property + def flow_settings(self) -> schemas.FlowSettings: + return self._flow_settings + + @flow_settings.setter + def flow_settings(self, flow_settings: schemas.FlowSettings): + if ( + (self._flow_settings.execution_location != flow_settings.execution_location) or + (self._flow_settings.execution_mode != flow_settings.execution_mode) + ): + self.reset() + self._flow_settings = flow_settings def add_node_promise(self, node_promise: input_schema.NodePromise): """Adds a placeholder node to the graph that is not yet fully configured. @@ -320,6 +331,7 @@ def print_tree(self): if not self._node_db: self.flow_logger.info("Empty flow graph") return + # Build node information node_info = build_node_info(self.nodes) @@ -339,7 +351,7 @@ def print_tree(self): # Track which nodes connect to what merge_points = define_node_connections(node_info) - + # Build the flow paths # Find the maximum label length for each depth level @@ -348,7 +360,7 @@ def print_tree(self): if depth in depth_groups: max_len = max(len(node_info[nid].label) for nid in depth_groups[depth]) max_label_length[depth] = max_len - + # Draw the paths drawn_nodes = set() merge_drawn = set() @@ -1583,6 +1595,8 @@ def execution_location(self, execution_location: schemas.ExecutionLocationsLiter Args: execution_location: The execution location to set. """ + if self.flow_settings.execution_location != execution_location: + self.reset() self.flow_settings.execution_location = execution_location def run_graph(self) -> RunInformation | None: @@ -1949,4 +1963,4 @@ def delete_connection(graph, node_connection: input_schema.NodeConnection): to_node.delete_input_node( node_connection.output_connection.node_id, connection_type=node_connection.input_connection.connection_class, - ) \ No newline at end of file + ) diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py index dcf61cab..3a8f1a18 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py @@ -5,7 +5,6 @@ from flowfile_core.utils.arrow_reader import get_read_top_n from flowfile_core.schemas import input_schema, schemas from flowfile_core.configs.flow_logger import NodeLogger -from flowfile_core.configs.settings import SINGLE_FILE_MODE, OFFLOAD_TO_WORKER from flowfile_core.schemas.output_model import TableExample, FileColumn, NodeData from flowfile_core.flowfile.utils import get_hash @@ -681,7 +680,7 @@ def remove_cache(self): logger.warning('Not implemented') def needs_run(self, performance_mode: bool, node_logger: NodeLogger = None, - execution_location: schemas.ExecutionLocationsLiteral = "auto") -> bool: + execution_location: schemas.ExecutionLocationsLiteral = "worker") -> bool: """Determines if the node needs to be executed. The decision is based on its run state, caching settings, and execution mode. @@ -694,7 +693,7 @@ def needs_run(self, performance_mode: bool, node_logger: NodeLogger = None, Returns: True if the node should be run, False otherwise. """ - if execution_location == "local" or SINGLE_FILE_MODE: + if execution_location == "local": return False flow_logger = logger if node_logger is None else node_logger @@ -879,7 +878,7 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca if self.is_setup: node_logger.info(f'Starting to run {self.__name__}') if (self.needs_run(performance_mode, node_logger, run_location) or self.node_template.node_group == "output" - and not (run_location == 'local' or SINGLE_FILE_MODE)): + and not (run_location == 'local')): self.prepare_before_run() try: if ((run_location == 'remote' or (self.node_default.transform_type == 'wide') @@ -908,8 +907,14 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca node_logger=node_logger) else: self.results.errors = str(e) - node_logger.error(f'Error with running the node: {e}') - elif ((run_location == 'local' or SINGLE_FILE_MODE) and + if "Connection refused" in str(e) and "/submit_query/" in str(e): + node_logger.warning("There was an issue connecting to the remote worker, " + "ensure the worker process is running, " + "or change the settings to, so it executes locally") + node_logger.error("Could not execute in the remote worker. (Re)start the worker service, or change settings to local settings.") + else: + node_logger.error(f'Error with running the node: {e}') + elif ((run_location == 'local') and (not self.node_stats.has_run_with_current_setup or self.node_template.node_group == "output")): try: node_logger.info('Executing fully locally') diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/models.py b/flowfile_core/flowfile_core/flowfile/flow_node/models.py index 67694aba..07184164 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/models.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/models.py @@ -108,14 +108,12 @@ class NodeStepSettings: streamable: If True, the node can process data in a streaming fashion. setup_errors: If True, indicates a non-blocking error occurred during setup. breaking_setup_errors: If True, indicates an error occurred that prevents execution. - execute_location: The preferred location for execution ('auto', 'local', 'remote'). """ cache_results: bool = False renew_schema: bool = True streamable: bool = True setup_errors: bool = False breaking_setup_errors: bool = False - execute_location: schemas.ExecutionLocationsLiteral = 'auto' class NodeStepInputs: diff --git a/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py b/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py index 035f40fa..4022400f 100644 --- a/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py +++ b/flowfile_core/flowfile_core/flowfile/manage/compatibility_enhancements.py @@ -48,7 +48,7 @@ def ensure_compatibility(flow_storage_obj: schemas.FlowInformation, flow_path: s setattr(flow_storage_obj, 'flow_settings', flow_settings) flow_storage_obj = schemas.FlowInformation.model_validate(flow_storage_obj) elif not hasattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location'): - setattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location', 'auto') + setattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location', "remote") elif not hasattr(flow_storage_obj.flow_settings, 'is_running'): setattr(flow_storage_obj.flow_settings, 'is_running', False) setattr(flow_storage_obj.flow_settings, 'is_canceled', False) diff --git a/flowfile_core/flowfile_core/schemas/schemas.py b/flowfile_core/flowfile_core/schemas/schemas.py index be518928..c349dd85 100644 --- a/flowfile_core/flowfile_core/schemas/schemas.py +++ b/flowfile_core/flowfile_core/schemas/schemas.py @@ -1,8 +1,35 @@ from typing import Optional, List, Dict, Tuple, Any, Literal, Annotated from pydantic import BaseModel, field_validator, ConfigDict, Field, StringConstraints from flowfile_core.flowfile.utils import create_unique_id +from flowfile_core.configs.settings import OFFLOAD_TO_WORKER ExecutionModeLiteral = Literal['Development', 'Performance'] -ExecutionLocationsLiteral = Literal['auto', 'local', 'remote'] +ExecutionLocationsLiteral = Literal['local', 'remote'] + + +def get_global_execution_location() -> ExecutionLocationsLiteral: + """ + Calculates the default execution location based on the global settings + Returns + ------- + ExecutionLocationsLiteral where the current + """ + if OFFLOAD_TO_WORKER: + return "remote" + return "local" + + +def is_valid_execution_location_in_current_global_settings(execution_location: ExecutionLocationsLiteral) -> bool: + return not (get_global_execution_location() == "local" and execution_location == "remote") + + +def get_prio_execution_location(local_execution_location: ExecutionLocationsLiteral, + global_execution_location: ExecutionLocationsLiteral) -> ExecutionLocationsLiteral: + if local_execution_location == global_execution_location: + return local_execution_location + elif global_execution_location == "local" and local_execution_location == "remote": + return "local" + else: + return local_execution_location class FlowGraphConfig(BaseModel): @@ -16,7 +43,7 @@ class FlowGraphConfig(BaseModel): name (str): The name of the flow. path (str): The file path associated with the flow. execution_mode (ExecutionModeLiteral): The mode of execution ('Development' or 'Performance'). - execution_location (ExecutionLocationsLiteral): The location for execution ('auto', 'local', 'remote'). + execution_location (ExecutionLocationsLiteral): The location for execution ('local', 'remote'). """ flow_id: int = Field(default_factory=create_unique_id, description="Unique identifier for the flow.") description: Optional[str] = None @@ -24,7 +51,23 @@ class FlowGraphConfig(BaseModel): name: str = '' path: str = '' execution_mode: ExecutionModeLiteral = 'Performance' - execution_location: ExecutionLocationsLiteral = "auto" + execution_location: ExecutionLocationsLiteral = Field(default_factory=get_global_execution_location) + + @field_validator('execution_location', mode='before') + def validate_and_set_execution_location(cls, v: Optional[ExecutionLocationsLiteral]) -> ExecutionLocationsLiteral: + """ + Validates and sets the execution location. + 1. **If `None` is provided**: It defaults to the location determined by global settings. + 2. **If a value is provided**: It checks if the value is compatible with the global + settings. If not (e.g., requesting 'remote' when only 'local' is possible), + it corrects the value to a compatible one. + """ + if v is None: + return get_global_execution_location() + if v == "auto": + return get_global_execution_location() + + return get_prio_execution_location(v, get_global_execution_location()) class FlowSettings(FlowGraphConfig): diff --git a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py index e97eaf22..4e2ba5d3 100644 --- a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py +++ b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py @@ -530,9 +530,6 @@ def test_analytics_processor_from_parquet_file_run_performance(): def test_analytics_processor_from_parquet_file_run_in_one_local_process(): - from flowfile_core.configs.settings import OFFLOAD_TO_WORKER - OFFLOAD_TO_WORKER.value = False - graph = create_graph() graph.flow_settings.execution_location = "local" @@ -549,4 +546,3 @@ def test_analytics_processor_from_parquet_file_run_in_one_local_process(): graph.run_graph() assert node_step.results.analysis_data_generator, 'The node should have to run' assert node_step.results.analysis_data_generator().__len__() == 10_000, 'There should be 1000 rows in the data' - OFFLOAD_TO_WORKER.value = True diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index 8649ccae..41483d2c 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -14,13 +14,25 @@ from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema - import pytest from pathlib import Path from typing import List, Dict, Literal from copy import deepcopy from time import sleep +def find_parent_directory(target_dir_name,): + """Navigate up directories until finding the target directory""" + current_path = Path(__file__) + + while current_path != current_path.parent: + if current_path.name == target_dir_name: + return current_path + if current_path.name == target_dir_name: + return current_path + current_path = current_path.parent + + raise FileNotFoundError(f"Directory '{target_dir_name}' not found") + try: from tests.flowfile_core_test_utils import (is_docker_available, ensure_password_is_available) from tests.utils import ensure_cloud_storage_connection_is_available_and_get_connection @@ -253,11 +265,16 @@ def test_opening_parquet_file(flow_logger: FlowLogger): def test_running_performance_mode(): graph = create_graph() + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER add_node_promise_on_type(graph, 'read', 1, 1) - received_table = input_schema.ReceivedTable(file_type='parquet', name='table.parquet', - path='flowfile_core/tests/support_files/data/table.parquet') + from flowfile_core.configs.flow_logger import main_logger + received_table = input_schema.ReceivedTable( + file_type='parquet', name='table.parquet', + path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet')) node_read = input_schema.NodeRead(flow_id=1, node_id=1, cache_data=False, received_file=received_table) graph.add_read(node_read) + main_logger.warning(str(graph)) + main_logger.warning(OFFLOAD_TO_WORKER) add_node_promise_on_type(graph, 'record_count', 2) connection = input_schema.NodeConnection.create_from_simple_input(1, 2) add_connection(graph, connection) @@ -268,6 +285,7 @@ def test_running_performance_mode(): graph.reset() graph.flow_settings.execution_mode = 'Development' slow = graph.run_graph() + assert slow.node_step_result[1].run_time > fast.node_step_result[1].run_time, 'Performance mode should be faster' @@ -319,11 +337,8 @@ def test_add_fuzzy_match(): def test_add_fuzzy_match_lcoal(): - from flowfile_core.configs.settings import OFFLOAD_TO_WORKER - graph = create_graph() graph.flow_settings.execution_location = "local" - OFFLOAD_TO_WORKER.value = False input_data = [{'name': 'eduward'}, {'name': 'edward'}, {'name': 'courtney'}] @@ -350,7 +365,6 @@ def test_add_fuzzy_match_lcoal(): 'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 0.8571428571428572, 1.0]} ) output_data.assert_equal(expected_data) - OFFLOAD_TO_WORKER.value = True def test_add_record_count(): @@ -1138,21 +1152,22 @@ def tracking_method(*args, **kwargs): handle_run_info(result) +@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested") def test_complex_cloud_write_scenario(): + ensure_cloud_storage_connection_is_available_and_get_connection() handler = FlowfileHandler() - flow_id = handler.import_flow(Path("flowfile_core/tests/support_files/flows/test_cloud_local.flowfile")) + + flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile") graph = handler.get_flow(flow_id) node= graph.get_node(3) - node.get_table_example(True) - graph.run_graph() + example_data = node.get_table_example(True) + assert example_data.number_of_columns == 4 + run_info = graph.run_graph() + handle_run_info(run_info) def test_no_re_calculate_example_data_after_change_no_run(): - from flowfile_core.configs.settings import OFFLOAD_TO_WORKER - - OFFLOAD_TO_WORKER.value = False - graph = get_dependency_example() graph.flow_settings.execution_location = "local" graph.run_graph() @@ -1188,12 +1203,8 @@ def test_no_re_calculate_example_data_after_change_no_run(): assert after_change_data_after_run != first_data, 'Data should be different after run' - OFFLOAD_TO_WORKER.value = True - def test_add_fuzzy_match_only_local(): - from flowfile_core.configs.settings import OFFLOAD_TO_WORKER - OFFLOAD_TO_WORKER.value = False graph = create_graph() graph.flow_settings.execution_location = "local" input_data = [{'name': 'eduward'}, @@ -1222,7 +1233,40 @@ def test_add_fuzzy_match_only_local(): 'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 1.0, 0.8571428571428572]} ) output_data.assert_equal(expected_data) - OFFLOAD_TO_WORKER.value = True + + +def test_changes_execution_mode(flow_logger): + settings = {'flow_id': 1, 'node_id': 1, 'pos_x': 304.8727272727273, + 'pos_y': 549.5272727272727, 'is_setup': True, 'description': 'Test csv', + 'received_file': {'id': None, 'name': 'fake_data.csv', + 'path': str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/fake_data.csv'), + 'directory': None, 'analysis_file_available': False, 'status': None, + 'file_type': 'csv', 'fields': [], 'reference': '', 'starting_from_line': 0, + 'delimiter': ',', 'has_headers': True, 'encoding': 'utf-8', 'parquet_ref': None, + 'row_delimiter': '', 'quote_char': '', 'infer_schema_length': 20000, + 'truncate_ragged_lines': False, 'ignore_errors': False, 'sheet_name': None, + 'start_row': 0, 'start_column': 0, 'end_row': 0, 'end_column': 0, + 'type_inference': False}} + graph = create_graph() + flow_logger.warning(str(graph)) + add_node_promise_on_type(graph, 'read', 1) + input_file = input_schema.NodeRead(**settings) + graph.add_read(input_file) + run_info = graph.run_graph() + handle_run_info(run_info) + graph.add_select(select_settings=input_schema.NodeSelect(flow_id=1, node_id=2, + select_input=[transform_schema.SelectInput("City")], + keep_missing=True)) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2)) + explain_node_2 = graph.get_node(2).get_resulting_data().data_frame.explain() + assert "flowfile_core/tests/support_files/data/fake_data.csv" not in explain_node_2 + graph.execution_location = "local" + + explain_node_2 = graph.get_node(2).get_resulting_data().data_frame.explain() + # now it should read from the actual source, since we do not cache the data with the external worker + + assert "flowfile_core/tests/support_files/data/fake_data.csv" in explain_node_2 + def test_fuzzy_match_schema_predict(flow_logger): diff --git a/flowfile_core/tests/flowfile_core_test_utils.py b/flowfile_core/tests/flowfile_core_test_utils.py index 04996bda..176fea72 100644 --- a/flowfile_core/tests/flowfile_core_test_utils.py +++ b/flowfile_core/tests/flowfile_core_test_utils.py @@ -7,7 +7,6 @@ from flowfile_core.auth.models import SecretInput - def is_docker_available(): """Check if Docker is running.""" if platform.system() == "Windows": @@ -26,7 +25,6 @@ def ensure_password_is_available(): store_secret(db, secret, 1) - from contextlib import contextmanager @contextmanager diff --git a/flowfile_frame/flowfile_frame/__init__.py b/flowfile_frame/flowfile_frame/__init__.py index b7275535..9b8ba70b 100644 --- a/flowfile_frame/flowfile_frame/__init__.py +++ b/flowfile_frame/flowfile_frame/__init__.py @@ -1,9 +1,6 @@ # flowframe/__init__.py """A Polars-like API for building ETL graphs.""" -from flowfile_core.configs.settings import OFFLOAD_TO_WORKER -OFFLOAD_TO_WORKER.value = False - # Core classes from flowfile_frame.flow_frame import FlowFrame # noqa: F401 from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401 diff --git a/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/HeaderButtons.vue b/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/HeaderButtons.vue index dc4555e2..fd7a5a1c 100644 --- a/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/HeaderButtons.vue +++ b/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/HeaderButtons.vue @@ -67,6 +67,23 @@ +
+ + + + +
| null>(null); const executionModes = ref(["Development", "Performance"]); +interface ExecutionLocationOption { + key: ExecutionLocation; + label: string; +} + +const executionLocationOptions = ref([ + { key: "local", label: "Local" }, + { key: "remote", label: "Remote" }, +]); + const emit = defineEmits(["openFlow", "refreshFlow", "logs-start", "logs-stop"]); const loadFlowSettings = async () => { diff --git a/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/run.vue b/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/run.vue index 6046b853..5aa7635e 100644 --- a/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/run.vue +++ b/flowfile_frontend/src/renderer/app/features/designer/components/HeaderButtons/run.vue @@ -15,7 +15,7 @@ import { defineProps, ref, onUnmounted } from "vue"; import { useNodeStore } from "../../../../stores/column-store"; import { RunInformation } from "../../baseNode/nodeInterfaces"; import { ElNotification } from "element-plus"; -import { updateRunStatus } from "../../nodes/nodeLogic"; +import { updateRunStatus, getFlowSettings, FlowSettings } from "../../nodes/nodeLogic"; import { VueFlowStore } from "@vue-flow/core"; const nodeStore = useNodeStore(); @@ -57,12 +57,18 @@ interface NotificationConfig { type: "success" | "error"; } -const showNotification = (title: string, message: string, type?: "success" | "error") => { +const showNotification = ( + title: string, + message: string, + type?: "success" | "error", + dangerouslyUseHTMLString?: boolean, +) => { ElNotification({ - title, - message, - type, + title: title, + message: message, + type: type, position: "top-left", + dangerouslyUseHTMLString: dangerouslyUseHTMLString, }); }; @@ -117,9 +123,34 @@ const checkRunStatus = async () => { }; const runFlow = async () => { + const flowSettings: FlowSettings | null = await getFlowSettings(nodeStore.flow_id); + if (!flowSettings) { + throw new Error("Failed to retrieve flow settings"); + } + + const escapeHtml = (text: string): string => { + const div = document.createElement("div"); + div.textContent = text; + return div.innerHTML; + }; + freezeFlow(); + nodeStore.resetNodeResult(); - showNotification("Run started", "The flow started flowing"); + + const executionLocationText = flowSettings.execution_location === "local" ? "Local" : "Remote"; + + const escapedFlowName = escapeHtml(flowSettings.name); + + const notificationMessage = ` +
+
Flow: "${escapedFlowName}"
+
Mode: ${flowSettings.execution_mode}
+
Location: ${executionLocationText}
+
+ `; + + showNotification("🚀 Flow Started", notificationMessage, undefined, true); try { await axios.post("/flow/run/", null, { diff --git a/flowfile_frontend/src/renderer/app/features/designer/nodes/nodeLogic.ts b/flowfile_frontend/src/renderer/app/features/designer/nodes/nodeLogic.ts index 94c9b7ae..e44d92ff 100644 --- a/flowfile_frontend/src/renderer/app/features/designer/nodes/nodeLogic.ts +++ b/flowfile_frontend/src/renderer/app/features/designer/nodes/nodeLogic.ts @@ -4,6 +4,8 @@ import { NodeData, nodeData as nodeDataRef, TableExample, RunInformation} from ' import { AxiosResponse } from 'axios'; export type ExecutionMode = 'Development' | 'Performance'; +export type ExecutionLocation = 'local' | 'remote'; + export interface FlowSettings { flow_id: number @@ -14,6 +16,7 @@ export interface FlowSettings { modified_on?: number path?: string execution_mode: ExecutionMode + execution_location: ExecutionLocation show_detailed_progress: boolean is_running: boolean }