Skip to content

Commit c74d9da

Browse files
EdwardvaneechoudBernardo Fernandes
authored andcommitted
Feature/unify execution methods (Edwardvaneechoud#110)
* removing auto to improve maintainability * Ensure the offload per worker is determined per graph and there is no dependency on a global variable. * Small improvement in logging * Removing global change in tests * skipping test in docker
1 parent dc0e50b commit c74d9da

File tree

6 files changed

+82
-44
lines changed

6 files changed

+82
-44
lines changed

flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1937,7 +1937,6 @@ def get_number_of_records(self, warn: bool = False, force_calculate: bool = Fals
19371937
"""
19381938
if self.is_future and not self.is_collected:
19391939
return -1
1940-
calculate_in_worker_process = False if not OFFLOAD_TO_WORKER else calculate_in_worker_process
19411940
if self.number_of_records is None or self.number_of_records < 0 or force_calculate:
19421941
if self._number_of_records_callback is not None:
19431942
self._number_of_records_callback(self)

flowfile_core/flowfile_core/flowfile/flow_graph.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,18 @@ def __init__(self,
229229
elif input_flow is not None:
230230
self.add_datasource(input_file=input_flow)
231231

232-
skip_nodes, execution_order = compute_execution_plan(nodes=self.nodes,flow_starts=self._flow_starts+self.get_implicit_starter_nodes())
232+
@property
233+
def flow_settings(self) -> schemas.FlowSettings:
234+
return self._flow_settings
235+
236+
@flow_settings.setter
237+
def flow_settings(self, flow_settings: schemas.FlowSettings):
238+
if (
239+
(self._flow_settings.execution_location != flow_settings.execution_location) or
240+
(self._flow_settings.execution_mode != flow_settings.execution_mode)
241+
):
242+
self.reset()
243+
self._flow_settings = flow_settings
233244

234245
def add_node_promise(self, node_promise: input_schema.NodePromise):
235246
"""Adds a placeholder node to the graph that is not yet fully configured.
@@ -320,6 +331,7 @@ def print_tree(self):
320331
if not self._node_db:
321332
self.flow_logger.info("Empty flow graph")
322333
return
334+
323335
# Build node information
324336
node_info = build_node_info(self.nodes)
325337

@@ -339,7 +351,7 @@ def print_tree(self):
339351

340352
# Track which nodes connect to what
341353
merge_points = define_node_connections(node_info)
342-
354+
343355
# Build the flow paths
344356

345357
# Find the maximum label length for each depth level
@@ -348,7 +360,7 @@ def print_tree(self):
348360
if depth in depth_groups:
349361
max_len = max(len(node_info[nid].label) for nid in depth_groups[depth])
350362
max_label_length[depth] = max_len
351-
363+
352364
# Draw the paths
353365
drawn_nodes = set()
354366
merge_drawn = set()

flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from flowfile_core.utils.arrow_reader import get_read_top_n
66
from flowfile_core.schemas import input_schema, schemas
77
from flowfile_core.configs.flow_logger import NodeLogger
8-
from flowfile_core.configs.settings import SINGLE_FILE_MODE, OFFLOAD_TO_WORKER
98

109
from flowfile_core.schemas.output_model import TableExample, FileColumn, NodeData
1110
from flowfile_core.flowfile.utils import get_hash
@@ -921,8 +920,14 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca
921920
node_logger=node_logger)
922921
else:
923922
self.results.errors = str(e)
924-
node_logger.error(f'Error with running the node: {e}')
925-
elif ((run_location == 'local' or SINGLE_FILE_MODE) and
923+
if "Connection refused" in str(e) and "/submit_query/" in str(e):
924+
node_logger.warning("There was an issue connecting to the remote worker, "
925+
"ensure the worker process is running, "
926+
"or change the settings to, so it executes locally")
927+
node_logger.error("Could not execute in the remote worker. (Re)start the worker service, or change settings to local settings.")
928+
else:
929+
node_logger.error(f'Error with running the node: {e}')
930+
elif ((run_location == 'local') and
926931
(not self.node_stats.has_run_with_current_setup or self.node_template.node_group == "output")):
927932
try:
928933
node_logger.info('Executing fully locally')

flowfile_core/tests/flowfile/analytics/test_analytics_processor.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -545,9 +545,6 @@ def test_analytics_processor_from_parquet_file_run_performance():
545545

546546

547547
def test_analytics_processor_from_parquet_file_run_in_one_local_process():
548-
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
549-
OFFLOAD_TO_WORKER.value = False
550-
551548
graph = create_graph()
552549

553550
graph.flow_settings.execution_location = "local"
@@ -564,4 +561,3 @@ def test_analytics_processor_from_parquet_file_run_in_one_local_process():
564561
graph.run_graph()
565562
assert node_step.results.analysis_data_generator, 'The node should have to run'
566563
assert node_step.results.analysis_data_generator().__len__() == 10_000, 'There should be 1000 rows in the data'
567-
OFFLOAD_TO_WORKER.value = True

flowfile_core/tests/flowfile/test_flowfile.py

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,25 @@
1414
from flowfile_core.flowfile.flow_data_engine.flow_file_column.main import FlowfileColumn
1515
from flowfile_core.flowfile.schema_callbacks import pre_calculate_pivot_schema
1616

17-
1817
import pytest
1918
from pathlib import Path
2019
from typing import List, Dict, Literal
2120
from copy import deepcopy
2221
from time import sleep
2322

23+
def find_parent_directory(target_dir_name,):
24+
"""Navigate up directories until finding the target directory"""
25+
current_path = Path(__file__)
26+
27+
while current_path != current_path.parent:
28+
if current_path.name == target_dir_name:
29+
return current_path
30+
if current_path.name == target_dir_name:
31+
return current_path
32+
current_path = current_path.parent
33+
34+
raise FileNotFoundError(f"Directory '{target_dir_name}' not found")
35+
2436
try:
2537
from tests.flowfile_core_test_utils import (is_docker_available, ensure_password_is_available)
2638
from tests.utils import ensure_cloud_storage_connection_is_available_and_get_connection
@@ -254,15 +266,22 @@ def test_opening_parquet_file(flow_logger: FlowLogger):
254266
def test_running_performance_mode():
255267
graph = create_graph()
256268
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
269+
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
257270
add_node_promise_on_type(graph, 'read', 1, 1)
258271
from flowfile_core.configs.flow_logger import main_logger
272+
received_table = input_schema.ReceivedTable(
273+
file_type='parquet', name='table.parquet',
274+
path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet'))
275+
from flowfile_core.configs.flow_logger import main_logger
259276
received_table = input_schema.ReceivedTable(
260277
file_type='parquet', name='table.parquet',
261278
path=str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/table.parquet'))
262279
node_read = input_schema.NodeRead(flow_id=1, node_id=1, cache_data=False, received_file=received_table)
263280
graph.add_read(node_read)
264281
main_logger.warning(str(graph))
265282
main_logger.warning(OFFLOAD_TO_WORKER)
283+
main_logger.warning(str(graph))
284+
main_logger.warning(OFFLOAD_TO_WORKER)
266285
add_node_promise_on_type(graph, 'record_count', 2)
267286
connection = input_schema.NodeConnection.create_from_simple_input(1, 2)
268287
add_connection(graph, connection)
@@ -274,6 +293,7 @@ def test_running_performance_mode():
274293
graph.flow_settings.execution_mode = 'Development'
275294
slow = graph.run_graph()
276295

296+
277297
assert slow.node_step_result[1].run_time > fast.node_step_result[1].run_time, 'Performance mode should be faster'
278298

279299

@@ -325,11 +345,8 @@ def test_add_fuzzy_match():
325345

326346

327347
def test_add_fuzzy_match_lcoal():
328-
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
329-
330348
graph = create_graph()
331349
graph.flow_settings.execution_location = "local"
332-
OFFLOAD_TO_WORKER.value = False
333350
input_data = [{'name': 'eduward'},
334351
{'name': 'edward'},
335352
{'name': 'courtney'}]
@@ -356,7 +373,6 @@ def test_add_fuzzy_match_lcoal():
356373
'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 0.8571428571428572, 1.0]}
357374
)
358375
output_data.assert_equal(expected_data)
359-
OFFLOAD_TO_WORKER.value = True
360376

361377

362378
def test_add_record_count():
@@ -1164,12 +1180,16 @@ def tracking_method(*args, **kwargs):
11641180
handle_run_info(result)
11651181

11661182

1183+
@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested")
11671184
@pytest.mark.skipif(not is_docker_available(), reason="Docker is not available or not running so database reader cannot be tested")
11681185
def test_complex_cloud_write_scenario():
11691186

1187+
11701188
ensure_cloud_storage_connection_is_available_and_get_connection()
11711189
handler = FlowfileHandler()
11721190

1191+
flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile")
1192+
11731193
flow_id = handler.import_flow(find_parent_directory("Flowfile") / "flowfile_core/tests/support_files/flows/test_cloud_local.flowfile")
11741194
graph = handler.get_flow(flow_id)
11751195
node= graph.get_node(3)
@@ -1194,26 +1214,6 @@ def test_no_re_calculate_example_data_after_change_no_run():
11941214
add_connection(graph, input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=3))
11951215
graph.run_graph()
11961216

1197-
1198-
def test_no_re_calculate_example_data_after_change_no_run():
1199-
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
1200-
1201-
OFFLOAD_TO_WORKER.value = False
1202-
1203-
graph = get_dependency_example()
1204-
graph.flow_settings.execution_location = "local"
1205-
graph.run_graph()
1206-
graph.add_formula(
1207-
input_schema.NodeFormula(
1208-
flow_id=1,
1209-
node_id=3,
1210-
function=transform_schema.FunctionInput(transform_schema.FieldInput(name="titleCity"),
1211-
function="titlecase([city])"),
1212-
)
1213-
)
1214-
add_connection(graph, input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=3))
1215-
graph.run_graph()
1216-
12171217
first_data = [row["titleCity"] for row in graph.get_node_data(3, True).main_output.data]
12181218
assert len(first_data) > 0, 'Data should be present'
12191219
graph.add_formula(
@@ -1235,12 +1235,8 @@ def test_no_re_calculate_example_data_after_change_no_run():
12351235

12361236
assert after_change_data_after_run != first_data, 'Data should be different after run'
12371237

1238-
OFFLOAD_TO_WORKER.value = True
1239-
12401238

12411239
def test_add_fuzzy_match_only_local():
1242-
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
1243-
OFFLOAD_TO_WORKER.value = False
12441240
graph = create_graph()
12451241
graph.flow_settings.execution_location = "local"
12461242
input_data = [{'name': 'eduward'},
@@ -1269,7 +1265,40 @@ def test_add_fuzzy_match_only_local():
12691265
'name_vs_name_right_levenshtein': [1.0, 0.8571428571428572, 1.0, 1.0, 0.8571428571428572]}
12701266
)
12711267
output_data.assert_equal(expected_data)
1272-
OFFLOAD_TO_WORKER.value = True
1268+
1269+
1270+
def test_changes_execution_mode(flow_logger):
1271+
settings = {'flow_id': 1, 'node_id': 1, 'pos_x': 304.8727272727273,
1272+
'pos_y': 549.5272727272727, 'is_setup': True, 'description': 'Test csv',
1273+
'received_file': {'id': None, 'name': 'fake_data.csv',
1274+
'path': str(find_parent_directory("Flowfile")/'flowfile_core/tests/support_files/data/fake_data.csv'),
1275+
'directory': None, 'analysis_file_available': False, 'status': None,
1276+
'file_type': 'csv', 'fields': [], 'reference': '', 'starting_from_line': 0,
1277+
'delimiter': ',', 'has_headers': True, 'encoding': 'utf-8', 'parquet_ref': None,
1278+
'row_delimiter': '', 'quote_char': '', 'infer_schema_length': 20000,
1279+
'truncate_ragged_lines': False, 'ignore_errors': False, 'sheet_name': None,
1280+
'start_row': 0, 'start_column': 0, 'end_row': 0, 'end_column': 0,
1281+
'type_inference': False}}
1282+
graph = create_graph()
1283+
flow_logger.warning(str(graph))
1284+
add_node_promise_on_type(graph, 'read', 1)
1285+
input_file = input_schema.NodeRead(**settings)
1286+
graph.add_read(input_file)
1287+
run_info = graph.run_graph()
1288+
handle_run_info(run_info)
1289+
graph.add_select(select_settings=input_schema.NodeSelect(flow_id=1, node_id=2,
1290+
select_input=[transform_schema.SelectInput("City")],
1291+
keep_missing=True))
1292+
add_connection(graph, input_schema.NodeConnection.create_from_simple_input(1, 2))
1293+
explain_node_2 = graph.get_node(2).get_resulting_data().data_frame.explain()
1294+
assert "flowfile_core/tests/support_files/data/fake_data.csv" not in explain_node_2
1295+
graph.execution_location = "local"
1296+
1297+
explain_node_2 = graph.get_node(2).get_resulting_data().data_frame.explain()
1298+
# now it should read from the actual source, since we do not cache the data with the external worker
1299+
1300+
assert "flowfile_core/tests/support_files/data/fake_data.csv" in explain_node_2
1301+
12731302

12741303

12751304
def test_fuzzy_match_schema_predict(flow_logger):

flowfile_frame/flowfile_frame/__init__.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
# flowframe/__init__.py
22
"""A Polars-like API for building ETL graphs."""
33

4-
from flowfile_core.configs.settings import OFFLOAD_TO_WORKER
5-
OFFLOAD_TO_WORKER.value = False
6-
74
# Core classes
85
from flowfile_frame.flow_frame import FlowFrame # noqa: F401
96
from pl_fuzzy_frame_match.models import FuzzyMapping # noqa: F401

0 commit comments

Comments
 (0)