Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions flowfile_core/flowfile_core/configs/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
DEFAULT_SERVER_HOST = "0.0.0.0"
DEFAULT_SERVER_PORT = 63578
DEFAULT_WORKER_PORT = 63579
SINGLE_FILE_MODE: bool = os.environ.get("FLOWFILE_SINGLE_FILE_MODE", "0") == "1"

# Single file mode flag, this determines where worker requests are being send to.
SINGLE_FILE_MODE: MutableBool = MutableBool(os.environ.get("FLOWFILE_SINGLE_FILE_MODE", "0") == "1")

OFFLOAD_TO_WORKER = MutableBool(not SINGLE_FILE_MODE)
# Offload to worker flag, this determines if the worker should handle processing tasks.
OFFLOAD_TO_WORKER: MutableBool = MutableBool(os.environ.get("FLOWFILE_OFFLOAD_TO_WORKER", "1") == "1")


def parse_args():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
from flowfile_core.configs import logger
from flowfile_core.utils.utils import ensure_similarity_dicts
from flowfile_core.configs.flow_logger import NodeLogger
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
from flowfile_core.schemas import (
cloud_storage_schemas,
input_schema,
transform_schema as transform_schemas
)
from flowfile_core.schemas.schemas import ExecutionLocationsLiteral, get_global_execution_location

# Local imports - Flow File Components
from flowfile_core.flowfile.flow_data_engine import utils
Expand Down Expand Up @@ -66,6 +66,7 @@

T = TypeVar('T', pl.DataFrame, pl.LazyFrame)


def _handle_duplication_join_keys(left_df: T, right_df: T, join_input: transform_schemas.JoinInput) -> Tuple[T, T, Dict[str, str]]:
"""Temporarily renames join keys to avoid conflicts during a join.

Expand Down Expand Up @@ -1565,40 +1566,44 @@ def __get_sample__(self, n_rows: int = 100, streamable: bool = True) -> "FlowDat
return FlowDataEngine(df, number_of_records=len(df), schema=self.schema)

def get_sample(self, n_rows: int = 100, random: bool = False, shuffle: bool = False,
seed: int = None) -> "FlowDataEngine":
seed: int = None, execution_location: Optional[ExecutionLocationsLiteral] = None) -> "FlowDataEngine":
"""Gets a sample of rows from the DataFrame.

Args:
n_rows: The number of rows to sample.
random: If True, performs random sampling. If False, takes the first n_rows.
shuffle: If True (and `random` is True), shuffles the data before sampling.
seed: A random seed for reproducibility.

execution_location: Location which is used to calculate the size of the dataframe
Returns:
A new `FlowDataEngine` instance containing the sampled data.
"""
n_records = min(n_rows, self.get_number_of_records(calculate_in_worker_process=OFFLOAD_TO_WORKER))
logging.info(f'Getting sample of {n_rows} rows')

if random:
if self.lazy and self.external_source is not None:
self.collect_external()

if self.lazy and shuffle:
sample_df = self.data_frame.collect(engine="streaming" if self._streamable else "auto").sample(n_rows,
seed=seed,
shuffle=shuffle)
sample_df = (self.data_frame.collect(engine="streaming" if self._streamable else "auto")
.sample(n_rows, seed=seed, shuffle=shuffle))
elif shuffle:
sample_df = self.data_frame.sample(n_rows, seed=seed, shuffle=shuffle)
else:
if execution_location is None:
execution_location = get_global_execution_location()
n_rows = min(n_rows, self.get_number_of_records(
calculate_in_worker_process=execution_location == "remote")
)

every_n_records = ceil(self.number_of_records / n_rows)
sample_df = self.data_frame.gather_every(every_n_records)
else:
if self.external_source:
self.collect(n_rows)
sample_df = self.data_frame.head(n_rows)

return FlowDataEngine(sample_df, schema=self.schema, number_of_records=n_records)
return FlowDataEngine(sample_df, schema=self.schema)

def get_subset(self, n_rows: int = 100) -> "FlowDataEngine":
"""Gets the first `n_rows` from the DataFrame.
Expand Down Expand Up @@ -1722,26 +1727,14 @@ def do_cross_join(self, cross_join_input: transform_schemas.CrossJoinInput,
left = self.data_frame.select(left_select).rename(cross_join_input.left_select.rename_table)
right = other.data_frame.select(right_select).rename(cross_join_input.right_select.rename_table)

if verify_integrity:
n_records = self.get_number_of_records() * other.get_number_of_records()
if n_records > 1_000_000_000:
raise Exception("Join will result in too many records, ending process")
else:
n_records = -1

joined_df = left.join(right, how='cross')

cols_to_delete_after = [col.new_name for col in
cross_join_input.left_select.renames + cross_join_input.left_select.renames
if col.join_key and not col.keep and col.is_available]

if verify_integrity:
return FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False,
number_of_records=n_records, streamable=False)
else:
fl = FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False,
number_of_records=0, streamable=False)
return fl
fl = FlowDataEngine(joined_df.drop(cols_to_delete_after), calculate_schema_stats=False, streamable=False)
return fl

def join(self, join_input: transform_schemas.JoinInput, auto_generate_selection: bool,
verify_integrity: bool, other: "FlowDataEngine") -> "FlowDataEngine":
Expand Down Expand Up @@ -1877,7 +1870,7 @@ def assert_equal(self, other: "FlowDataEngine", ordered: bool = True, strict_sch
other.number_of_records = -1
other = other.select_columns(self.columns)

if self.get_number_of_records() != other.get_number_of_records():
if self.get_number_of_records_in_process() != other.get_number_of_records_in_process():
raise Exception('Number of records is not equal')

if self.columns != other.columns:
Expand Down Expand Up @@ -1913,6 +1906,18 @@ def _calculate_number_of_records_in_worker(self) -> int:
).result
return number_of_records

def get_number_of_records_in_process(self, force_calculate: bool = False):
"""
Get the number of records in the DataFrame in the local process.

args:
force_calculate: If True, forces recalculation even if a value is cached.

Returns:
The total number of records.
"""
return self.get_number_of_records(force_calculate=force_calculate)

def get_number_of_records(self, warn: bool = False, force_calculate: bool = False,
calculate_in_worker_process: bool = False) -> int:
"""Gets the total number of records in the DataFrame.
Expand All @@ -1932,7 +1937,6 @@ def get_number_of_records(self, warn: bool = False, force_calculate: bool = Fals
"""
if self.is_future and not self.is_collected:
return -1
calculate_in_worker_process = False if not OFFLOAD_TO_WORKER else calculate_in_worker_process
if self.number_of_records is None or self.number_of_records < 0 or force_calculate:
if self._number_of_records_callback is not None:
self._number_of_records_callback(self)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ def results_exists(file_ref: str):
return False
except requests.RequestException as e:
logger.error(f"Failed to check results existence: {str(e)}")
if "Connection refused" in str(e):
logger.info("")
return False


Expand Down
26 changes: 20 additions & 6 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ class FlowGraph:
start_datetime: datetime = None
end_datetime: datetime = None
nodes_completed: int = 0
flow_settings: schemas.FlowSettings = None
_flow_settings: schemas.FlowSettings = None
flow_logger: FlowLogger

def __init__(self,
Expand All @@ -204,7 +204,7 @@ def __init__(self,
if isinstance(flow_settings, schemas.FlowGraphConfig):
flow_settings = schemas.FlowSettings.from_flow_settings_input(flow_settings)

self.flow_settings = flow_settings
self._flow_settings = flow_settings
self.uuid = str(uuid1())
self.nodes_completed = 0
self.start_datetime = None
Expand All @@ -229,7 +229,18 @@ def __init__(self,
elif input_flow is not None:
self.add_datasource(input_file=input_flow)

skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes())
@property
def flow_settings(self) -> schemas.FlowSettings:
return self._flow_settings

@flow_settings.setter
def flow_settings(self, flow_settings: schemas.FlowSettings):
if (
(self._flow_settings.execution_location != flow_settings.execution_location) or
(self._flow_settings.execution_mode != flow_settings.execution_mode)
):
self.reset()
self._flow_settings = flow_settings

def add_node_promise(self, node_promise: input_schema.NodePromise):
"""Adds a placeholder node to the graph that is not yet fully configured.
Expand Down Expand Up @@ -320,6 +331,7 @@ def print_tree(self):
if not self._node_db:
self.flow_logger.info("Empty flow graph")
return

# Build node information
node_info = build_node_info(self.nodes)

Expand All @@ -339,7 +351,7 @@ def print_tree(self):

# Track which nodes connect to what
merge_points = define_node_connections(node_info)

# Build the flow paths

# Find the maximum label length for each depth level
Expand All @@ -348,7 +360,7 @@ def print_tree(self):
if depth in depth_groups:
max_len = max(len(node_info[nid].label) for nid in depth_groups[depth])
max_label_length[depth] = max_len

# Draw the paths
drawn_nodes = set()
merge_drawn = set()
Expand Down Expand Up @@ -1583,6 +1595,8 @@ def execution_location(self, execution_location: schemas.ExecutionLocationsLiter
Args:
execution_location: The execution location to set.
"""
if self.flow_settings.execution_location != execution_location:
self.reset()
self.flow_settings.execution_location = execution_location

def run_graph(self) -> RunInformation | None:
Expand Down Expand Up @@ -1949,4 +1963,4 @@ def delete_connection(graph, node_connection: input_schema.NodeConnection):
to_node.delete_input_node(
node_connection.output_connection.node_id,
connection_type=node_connection.input_connection.connection_class,
)
)
17 changes: 11 additions & 6 deletions flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from flowfile_core.utils.arrow_reader import get_read_top_n
from flowfile_core.schemas import input_schema, schemas
from flowfile_core.configs.flow_logger import NodeLogger
from flowfile_core.configs.settings import SINGLE_FILE_MODE, OFFLOAD_TO_WORKER

from flowfile_core.schemas.output_model import TableExample, FileColumn, NodeData
from flowfile_core.flowfile.utils import get_hash
Expand Down Expand Up @@ -681,7 +680,7 @@ def remove_cache(self):
logger.warning('Not implemented')

def needs_run(self, performance_mode: bool, node_logger: NodeLogger = None,
execution_location: schemas.ExecutionLocationsLiteral = "auto") -> bool:
execution_location: schemas.ExecutionLocationsLiteral = "worker") -> bool:
"""Determines if the node needs to be executed.

The decision is based on its run state, caching settings, and execution mode.
Expand All @@ -694,7 +693,7 @@ def needs_run(self, performance_mode: bool, node_logger: NodeLogger = None,
Returns:
True if the node should be run, False otherwise.
"""
if execution_location == "local" or SINGLE_FILE_MODE:
if execution_location == "local":
return False

flow_logger = logger if node_logger is None else node_logger
Expand Down Expand Up @@ -879,7 +878,7 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca
if self.is_setup:
node_logger.info(f'Starting to run {self.__name__}')
if (self.needs_run(performance_mode, node_logger, run_location) or self.node_template.node_group == "output"
and not (run_location == 'local' or SINGLE_FILE_MODE)):
and not (run_location == 'local')):
self.prepare_before_run()
try:
if ((run_location == 'remote' or (self.node_default.transform_type == 'wide')
Expand Down Expand Up @@ -908,8 +907,14 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca
node_logger=node_logger)
else:
self.results.errors = str(e)
node_logger.error(f'Error with running the node: {e}')
elif ((run_location == 'local' or SINGLE_FILE_MODE) and
if "Connection refused" in str(e) and "/submit_query/" in str(e):
node_logger.warning("There was an issue connecting to the remote worker, "
"ensure the worker process is running, "
"or change the settings to, so it executes locally")
node_logger.error("Could not execute in the remote worker. (Re)start the worker service, or change settings to local settings.")
else:
node_logger.error(f'Error with running the node: {e}')
elif ((run_location == 'local') and
(not self.node_stats.has_run_with_current_setup or self.node_template.node_group == "output")):
try:
node_logger.info('Executing fully locally')
Expand Down
2 changes: 0 additions & 2 deletions flowfile_core/flowfile_core/flowfile/flow_node/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ class NodeStepSettings:
streamable: If True, the node can process data in a streaming fashion.
setup_errors: If True, indicates a non-blocking error occurred during setup.
breaking_setup_errors: If True, indicates an error occurred that prevents execution.
execute_location: The preferred location for execution ('auto', 'local', 'remote').
"""
cache_results: bool = False
renew_schema: bool = True
streamable: bool = True
setup_errors: bool = False
breaking_setup_errors: bool = False
execute_location: schemas.ExecutionLocationsLiteral = 'auto'


class NodeStepInputs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def ensure_compatibility(flow_storage_obj: schemas.FlowInformation, flow_path: s
setattr(flow_storage_obj, 'flow_settings', flow_settings)
flow_storage_obj = schemas.FlowInformation.model_validate(flow_storage_obj)
elif not hasattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location'):
setattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location', 'auto')
setattr(getattr(flow_storage_obj, 'flow_settings'), 'execution_location', "remote")
elif not hasattr(flow_storage_obj.flow_settings, 'is_running'):
setattr(flow_storage_obj.flow_settings, 'is_running', False)
setattr(flow_storage_obj.flow_settings, 'is_canceled', False)
Expand Down
49 changes: 46 additions & 3 deletions flowfile_core/flowfile_core/schemas/schemas.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,35 @@
from typing import Optional, List, Dict, Tuple, Any, Literal, Annotated
from pydantic import BaseModel, field_validator, ConfigDict, Field, StringConstraints
from flowfile_core.flowfile.utils import create_unique_id
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
ExecutionModeLiteral = Literal['Development', 'Performance']
ExecutionLocationsLiteral = Literal['auto', 'local', 'remote']
ExecutionLocationsLiteral = Literal['local', 'remote']


def get_global_execution_location() -> ExecutionLocationsLiteral:
"""
Calculates the default execution location based on the global settings
Returns
-------
ExecutionLocationsLiteral where the current
"""
if OFFLOAD_TO_WORKER:
return "remote"
return "local"


def is_valid_execution_location_in_current_global_settings(execution_location: ExecutionLocationsLiteral) -> bool:
return not (get_global_execution_location() == "local" and execution_location == "remote")


def get_prio_execution_location(local_execution_location: ExecutionLocationsLiteral,
global_execution_location: ExecutionLocationsLiteral) -> ExecutionLocationsLiteral:
if local_execution_location == global_execution_location:
return local_execution_location
elif global_execution_location == "local" and local_execution_location == "remote":
return "local"
else:
return local_execution_location


class FlowGraphConfig(BaseModel):
Expand All @@ -16,15 +43,31 @@ class FlowGraphConfig(BaseModel):
name (str): The name of the flow.
path (str): The file path associated with the flow.
execution_mode (ExecutionModeLiteral): The mode of execution ('Development' or 'Performance').
execution_location (ExecutionLocationsLiteral): The location for execution ('auto', 'local', 'remote').
execution_location (ExecutionLocationsLiteral): The location for execution ('local', 'remote').
"""
flow_id: int = Field(default_factory=create_unique_id, description="Unique identifier for the flow.")
description: Optional[str] = None
save_location: Optional[str] = None
name: str = ''
path: str = ''
execution_mode: ExecutionModeLiteral = 'Performance'
execution_location: ExecutionLocationsLiteral = "auto"
execution_location: ExecutionLocationsLiteral = Field(default_factory=get_global_execution_location)

@field_validator('execution_location', mode='before')
def validate_and_set_execution_location(cls, v: Optional[ExecutionLocationsLiteral]) -> ExecutionLocationsLiteral:
"""
Validates and sets the execution location.
1. **If `None` is provided**: It defaults to the location determined by global settings.
2. **If a value is provided**: It checks if the value is compatible with the global
settings. If not (e.g., requesting 'remote' when only 'local' is possible),
it corrects the value to a compatible one.
"""
if v is None:
return get_global_execution_location()
if v == "auto":
return get_global_execution_location()

return get_prio_execution_location(v, get_global_execution_location())


class FlowSettings(FlowGraphConfig):
Expand Down
Loading
Loading