diff --git a/docs/assets/images/ui/node_settings_formula.png b/docs/assets/images/ui/node_settings_formula.png new file mode 100644 index 00000000..bed6c4bd Binary files /dev/null and b/docs/assets/images/ui/node_settings_formula.png differ diff --git a/docs/for-developers/design-philosophy.md b/docs/for-developers/design-philosophy.md index 0c71a7ad..e79bc84e 100644 --- a/docs/for-developers/design-philosophy.md +++ b/docs/for-developers/design-philosophy.md @@ -446,6 +446,7 @@ result = flow.get_node(final_node_id).get_resulting_data() ``` **Characteristics:** + - ⚡ Pull-based execution from the final node - 🎯 Polars optimizes the entire pipeline - 💨 Data flows once through optimized plan diff --git a/docs/index.html b/docs/index.html index 377a32a2..a957f70b 100644 --- a/docs/index.html +++ b/docs/index.html @@ -2,7 +2,7 @@ - + Flowfile - Visual ETL Tool @@ -52,6 +52,8 @@ color: var(--md-default-fg-color); background-color: var(--md-default-bg-color); line-height: 1.6; + -webkit-font-smoothing: antialiased; + -moz-osx-font-smoothing: grayscale; } pre { @@ -59,24 +61,28 @@ padding: 1rem; border-radius: 8px; overflow-x: auto; + -webkit-overflow-scrolling: touch; + font-size: 0.875rem; } code { font-family: 'Source Code Pro', 'Roboto Mono', monospace; color: var(--md-code-fg-color); + font-size: 0.875rem; } - - @@ -622,7 +869,6 @@

🐍 Code Approach

-
Python API Documentation @@ -631,51 +877,51 @@

🐍 Code Approach

Visual Editor Guide
-
+
Read Developer Docs - +
-
-

See It For Yourself

-

- Start rediscovering how we bridge the gap between business users and technical users. -

- -
- -
5 min
-
to first pipeline
-
- -
100%
-
Familiar API
-
- -
0
-
vendor lock-in
-
-
-
- - Try Interactive Tutorial - - - Browse Examples - -
+ +
+

See It For Yourself

+

+ Start rediscovering how we bridge the gap between business users and technical users. +

+ + + + + +

+ Free, open-source and customizable +

+
-

- Free, open-source and customizable -

-
\ No newline at end of file diff --git a/docs/quickstart.md b/docs/quickstart.md index fd54f99b..4edaf695 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -1,16 +1,16 @@ # Quick Start Guide -
- Flowfile Logo -

Get Started with Flowfile in 5 Minutes

+
+ Flowfile Logo +

Get Started with Flowfile in 5 Minutes

## Installation -
-

Recommended: Install from PyPI

-
pip install flowfile
-

This installs everything you need - the Python API, visual editor, and all services.

+
+

Recommended quickstart: Install from PyPI

+
pip install flowfile
+

This installs everything you need - the Python API, visual editor, and all services.

### Alternative Installation Methods @@ -53,46 +53,46 @@ npm run dev:web # Terminal 3 (port 8080) ## Choose Your Path -
+
-
-

Non-Technical Users

-

Perfect for: Analysts, business users, Excel power users

-

No coding required!

+
+

Non-Technical Users

+

Perfect for: Analysts, business users, Excel power users

+

No coding required!

-
    -
  • ✅ Drag and drop interface
  • -
  • ✅ Visual data preview
  • -
  • ✅ Export to Excel/CSV
  • -
  • ✅ Built-in transformations
  • +
      +
    • ✅ Drag and drop interface
    • +
    • ✅ Visual data preview
    • +
    • ✅ Export to Excel/CSV
    • +
    • ✅ Built-in transformations
    - Start Visual Tutorial → + Start Visual Tutorial →
-
-

Technical Users

-

Perfect for: Developers, data scientists, engineers

-

Full programmatic control!

+
+

Technical Users

+

Perfect for: Developers, data scientists, engineers

+

Full programmatic control!

-
    -
  • ✅ Polars-compatible API
  • -
  • ✅ Cloud storage integration
  • -
  • ✅ Version control friendly
  • -
  • ✅ Complex dynamic logic
  • +
      +
    • ✅ Polars-compatible API
    • +
    • ✅ Cloud storage integration
    • +
    • ✅ Version control friendly
    • +
    • ✅ Complex dynamic logic
    - Start Python Tutorial → + Start Python Tutorial →
--- -## 🎨 Quick Start for Non-Technical Users {#non-technical-quickstart} +## Quick Start for Non-Technical Users {#non-technical-quickstart} -
-Goal: Clean and analyze sales data without writing any code +
+Goal: Clean and analyze sales data without writing any code
### Step 1: Start Flowfile, and create a Flow @@ -106,7 +106,7 @@ Your browser should automatically open to the Flowfile UI. !!! warning "If the browser does not open automatically" If the browser does not open automatically, you can manually navigate to [http://127.0.0.1:63578/ui#/main/designer](http://127.0.0.1:63578/ui#/main/designer) in your web browser. -
+
**Creating your First Flow:** @@ -124,7 +124,7 @@ Your should see now an empty flow: ### Step 2: Load Your Data -
+
**Loading a CSV or Excel file:** @@ -144,11 +144,11 @@ Your should see now an empty flow: Let's remove duplicate records and filter for high-value transactions: -
+
-
-

Remove Duplicates

-
    +
    +

    Remove Duplicates

    +
    1. Drag "Drop Duplicates" node from Transform section
    2. Connect it to your Read Data node
    3. Select columns to check for duplicates
    4. @@ -156,9 +156,9 @@ Let's remove duplicate records and filter for high-value transactions:
    -
    -

    Filter Data

    -
      +
      +

      Filter Data

      +
      1. Drag "Filter Data" node from Transform section
      2. Connect it to Drop Duplicates node
      3. Enter formula: [Quantity] > 7
      4. @@ -191,7 +191,7 @@ Let's remove duplicate records and filter for high-value transactions: ### Step 5: Save Your Results -
        +
        **Export your cleaned data:** @@ -206,7 +206,7 @@ Let's remove duplicate records and filter for high-value transactions:
        -
        +
        ### Here's what your complete flow should look like: @@ -222,9 +222,9 @@ You've just built your first data pipeline! You can: - **Schedule it** to run automatically (coming soon) - **Export as Python code** if you want to see what's happening behind the scenes -
        -### Pro Tips for Non-Technical Users: -
          +
          +

          Pro Tips for Non-Technical Users:

          +
          • Use descriptions: Right-click nodes and add descriptions to document your work
          • Preview often: Click nodes after running to see data at each step
          • Start small: Test with a sample of your data first
          • @@ -234,18 +234,18 @@ You've just built your first data pipeline! You can: ### Next Steps -
            - Complete Visual Guide - Learn All Nodes - Connect to Databases + --- ## Quick Start for Technical Users {#technical-quickstart} -
            -Goal: Build a production-ready ETL pipeline with cloud integration +
            +Goal: Build a production-ready ETL pipeline with cloud integration
            ### Step 1: Install and Import @@ -284,6 +284,8 @@ ff.create_cloud_storage_connection_if_not_exists( ### Step 3: Extract and Transform +
            + ```python # Build the pipeline (lazy evaluation - no data loaded yet!) import flowfile as ff @@ -360,6 +362,8 @@ pipeline = ( print(pipeline.explain()) # Shows optimized Polars query plan ``` +
            + ### Step 4: Load and Monitor ```python @@ -384,11 +388,11 @@ df_result = pipeline.collect() # NOW it executes everything! print(f"Processed {len(df_result):,} aggregated records") print(df_result.head()) ``` -### Step 5: Advanced Features +### Step 5: Advanced Features -
            -

            Visualize Your Pipeline

            +
            +

            Visualize Your Pipeline

            ```python # Open in visual editor @@ -399,7 +403,7 @@ ff.open_graph_in_editor(pipeline.flow_graph) ```
            - Visual overview of pipeline + Visual overview of pipeline ![Flow visualized](assets/images/quickstart/python_example.png) @@ -407,8 +411,8 @@ ff.open_graph_in_editor(pipeline.flow_graph)
            -
            -

            Export as Pure Python

            +
            +

            Export as Pure Python

            ```python # Generate standalone code @@ -419,10 +423,12 @@ code = pipeline.flow_graph.generate_code() ```
            - ### Step 6: Production Patterns **Pattern 1: Data Quality Checks** + +
            + ```python from datetime import datetime import flowfile as ff @@ -457,7 +463,12 @@ def data_quality_pipeline(df: ff.FlowFrame) -> ff.FlowFrame: return clean_df ``` +
            + **Pattern 2: Incremental Processing** + +
            + ```python # Read only new data since last run from datetime import datetime @@ -485,7 +496,12 @@ incremental_pipeline.write_delta( ) ``` +
            + **Pattern 3: Multi-Source Join** + +
            + ```python # Combine data from multiple sources import flowfile as ff @@ -524,39 +540,41 @@ enriched_orders = ( results = enriched_orders.collect() ``` +
            + ### Next Steps for Technical Users -
            - Complete API Reference - Architecture Deep Dive - Core Internals - Polars Documentation + --- ## 🌟 Why Flowfile? -
            +
            -
            -

            ⚡ Performance

            -

            Built on Polars - Uses the speed of Polars

            +
            +

            ⚡ Performance

            +

            Built on Polars - Uses the speed of Polars

            -
            -

            Dual Interface

            -

            Same pipeline works in both visual and code. Switch anytime, no lock-in.

            +
            +

            🔄 Dual Interface

            +

            Same pipeline works in both visual and code. Switch anytime, no lock-in.

            -
            -

            Export to Production

            -

            Generate pure Python/Polars code. Deploy anywhere without Flowfile.

            +
            +

            📦 Export to Production

            +

            Generate pure Python/Polars code. Deploy anywhere without Flowfile.

            -
            -

            Cloud Support

            -

            Direct S3/cloud storage support, no need for expensive clusters to analyse your data

            +
            +

            ☁️ Cloud Support

            +

            Direct S3/cloud storage support, no need for expensive clusters to analyse your data

            @@ -596,18 +614,28 @@ FLOWFILE_PORT=8080 flowfile run ui ### Get Help -- **Documentation**: [Full Documentation](users/index.html) -- **Discussions**: [GitHub Discussions](https://github.com/Edwardvaneechoud/Flowfile/discussions) -- **Issues**: [GitHub Issues](https://github.com/Edwardvaneechoud/Flowfile/issues) -- **Email**: evaneechoud@gmail.com +
            +
            + Documentation
            + Full Documentation +
            +
            + Discussions
            + GitHub Discussions +
            +
            + Issues
            + GitHub Issues +
            +
            --- -
            -

            Ready to Transform Your Data?

            -

            Join thousands of users building data pipelines with Flowfile

            -
            - Start Visual (No Code) → - Start Coding (Python) → +
            +

            Ready to Transform Your Data?

            +

            Join thousands of users building data pipelines with Flowfile

            +
            \ No newline at end of file diff --git a/docs/users/python-api/quickstart.md b/docs/users/python-api/quickstart.md index 12197237..32732aaf 100644 --- a/docs/users/python-api/quickstart.md +++ b/docs/users/python-api/quickstart.md @@ -217,4 +217,5 @@ pipeline.write_parquet("yearly_sales.parquet") --- -*Ready for more? Check out the [full API reference](reference/index.md) or learn about [core concepts](concepts/design-concepts.md).* \ No newline at end of file +*Ready for more? Check out the [full API reference](reference/index.md) or learn about [core concepts](concepts/design-concepts.md).* +Or want to see another example? Checkout the [quickstart guide](../../quickstart.md#quick-start-for-technical-users-technical-quickstart)! diff --git a/docs/users/visual-editor/building-flows.md b/docs/users/visual-editor/building-flows.md index 47c9b290..e215876c 100644 --- a/docs/users/visual-editor/building-flows.md +++ b/docs/users/visual-editor/building-flows.md @@ -2,6 +2,8 @@ Flowfile allows you to create data pipelines visually by connecting nodes that represent different data operations. This guide will walk you through the process of creating and running flows. +!!! info "Looking for a quickstart overview?" + Check out our [Quick Start Guide](../../quickstart.md#quick-start-for-non-technical-users-non-technical-quickstart) to get up and running in minutes. ## Interface Overview @@ -14,7 +16,6 @@ Flowfile allows you to create data pipelines visually by connecting nodes that r - **Right sidebar**: Configure node settings and parameters - **Bottom panel**: Preview data at each step - ## Creating a Flow ![Flowfile Landing](../../assets/images/ui/landing.png){ width="1200px" } @@ -40,9 +41,25 @@ Flowfile allows you to create data pipelines visually by connecting nodes that r ## Configuring Nodes ### Node Settings -![Flowfile Interface Overview](../../assets/images/ui/node_settings.png){ width="1200px" } -1. Click any node to open its settings in the right sidebar -2. Each node type has specific configuration options: + +
            + +
            +

            Click any node on the canvas to open its settings in the right sidebar. Each node type has unique configuration options tailored to its function.

            +

            For example, the "Formula" node shown here includes sections for:

            +
              +
            • 🎛️ General: Add a custom description via general settings
            • +
            • ⚙️ Performance tweaking: Define if the data needs to be cached for better performance via general settings
            • +
            • ↔️ Transformations: Define the formula to be applied on the incoming data
            • +
            +
            + +
            + Node settings panel showing configuration options for a Formula node +

            The settings panel for a "Formula" node.

            +
            + +
            ### Data Preview 1. After configuration, each node shows the output schema of the action @@ -52,9 +69,10 @@ Flowfile allows you to create data pipelines visually by connecting nodes that r ## Running Your Flow ### 1. Execution Options -Choose your xecution mode from the dropdown: - - **Development**: Lets you view the data in every step of the process, at the cost of performance - - **Performance**: Only executes steps needed for the output (e.g., writing data), allowing for query optimizations and better performance +Choose your execution mode from the settings panel: + +- **Development**: Lets you view the data in every step of the process, at the cost of performance +- **Performance**: Only executes steps needed for the output (e.g., writing data), allowing for query optimizations and better performance ### 2. Running the Flow 1. Click the **Run** button in the top toolbar @@ -102,15 +120,18 @@ Here's a typical flow that demonstrates common operations: - Use mouse wheel to zoom - Hold Shift to select multiple nodes - Right-click for context menu - - Right click on the text to add notes + - Right-click on the text to add notes - **Data Handling**: - Use sample nodes during development - Preview data frequently - - Check column types early - - Monitor memory usage + - Check column types early with *select* nodes + +--- --- +## Want to see another example? +Checkout the [quickstart guide](../../quickstart.md#quick-start-for-non-technical-users-non-technical-quickstart)! ## Next Steps diff --git a/flowfile/flowfile/__init__.py b/flowfile/flowfile/__init__.py index 1d735f7c..de7db989 100644 --- a/flowfile/flowfile/__init__.py +++ b/flowfile/flowfile/__init__.py @@ -7,7 +7,7 @@ - flowfile_worker: Computation engine """ -__version__ = "0.3.7" +__version__ = "0.3.8" import os import logging diff --git a/flowfile/flowfile/api.py b/flowfile/flowfile/api.py index 5b44c39d..cfbd2021 100644 --- a/flowfile/flowfile/api.py +++ b/flowfile/flowfile/api.py @@ -206,7 +206,7 @@ def check_if_in_single_mode() -> bool: try: response: requests.Response = requests.get(f"{FLOWFILE_BASE_URL}/single_mode", timeout=1) if response.ok: - return response.json() == "1" + return response.json() except Exception: pass return False @@ -400,6 +400,8 @@ def _open_flow_in_browser(flow_id: int) -> None: logger.info(f"Unified mode detected. Opening imported flow in browser: {flow_url}") try: time.sleep(0.5) + logger.info("Attempting to open browser tab for flow...") + logger.info("Opening URL in browser: %s", flow_url) webbrowser.open_new_tab(flow_url) except Exception as wb_err: logger.warning(f"Could not automatically open browser tab: {wb_err}") @@ -452,7 +454,7 @@ def open_graph_in_editor(flow_graph: FlowGraph, storage_location: Optional[str] return False flow_id = import_flow_to_editor(flow_file_path, auth_token) - + print(flow_id, "flow_id", flow_in_single_mode, automatically_open_browser) if flow_id is not None: if flow_in_single_mode and automatically_open_browser: _open_flow_in_browser(flow_id) diff --git a/flowfile/flowfile/web/__init__.py b/flowfile/flowfile/web/__init__.py index 28929a75..7c2443f8 100644 --- a/flowfile/flowfile/web/__init__.py +++ b/flowfile/flowfile/web/__init__.py @@ -51,6 +51,8 @@ async def svg_logo(): @app.get("/single_mode") async def in_single_mode() -> bool: + print("Checking if single file mode is enabled") + print(os.environ.get('FLOWFILE_SINGLE_FILE_MODE')) return os.environ.get('FLOWFILE_SINGLE_FILE_MODE', "0") == "1" @app.get("/ui", include_in_schema=False) diff --git a/flowfile_core/flowfile_core/configs/utils.py b/flowfile_core/flowfile_core/configs/utils.py index e12d6acb..4375da79 100644 --- a/flowfile_core/flowfile_core/configs/utils.py +++ b/flowfile_core/flowfile_core/configs/utils.py @@ -16,3 +16,8 @@ def __eq__(self, other) -> bool: elif isinstance(other, MutableBool): return self.value == other.value return NotImplemented + + def set(self, value): + """Set the value of the MutableBool""" + self.value = bool(value) + return self diff --git a/flowfile_core/flowfile_core/database/connection.py b/flowfile_core/flowfile_core/database/connection.py index 71a71bac..8117b90f 100644 --- a/flowfile_core/flowfile_core/database/connection.py +++ b/flowfile_core/flowfile_core/database/connection.py @@ -26,8 +26,6 @@ def get_app_data_dir() -> Path: base_dir = os.path.join(os.path.expanduser("~"), ".local", "share") app_dir = Path(base_dir) / app_name - - print(f"Using application data directory: {app_dir}") app_dir.mkdir(parents=True, exist_ok=True) return app_dir @@ -48,7 +46,7 @@ def get_database_url(): app_dir = get_app_data_dir() db_path = app_dir / "flowfile.db" - logger.info(f"Using database URL: sqlite:///{db_path}") + logger.debug(f"Using database URL: sqlite:///{db_path}") return f"sqlite:///{db_path}" diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/cloud_storage_reader.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/cloud_storage_reader.py index 19e689a9..194c8413 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/cloud_storage_reader.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/cloud_storage_reader.py @@ -68,7 +68,6 @@ def get_storage_options(connection: FullCloudStorageConnection) -> Dict[str, Any def _get_s3_storage_options(connection: 'FullCloudStorageConnection') -> Dict[str, Any]: """Build S3-specific storage options.""" auth_method = connection.auth_method - print(f"Building S3 storage options for auth_method: '{auth_method}'") if auth_method == "aws-cli": return create_storage_options_from_boto_credentials( profile_name=connection.connection_name, diff --git a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py index dec8bf03..806dbada 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py +++ b/flowfile_core/flowfile_core/flowfile/flow_data_engine/flow_data_engine.py @@ -1956,7 +1956,7 @@ def get_number_of_records(self, warn: bool = False, force_calculate: bool = Fals """ if self.is_future and not self.is_collected: return -1 - calculate_in_worker_process = False if not OFFLOAD_TO_WORKER.value else calculate_in_worker_process + calculate_in_worker_process = False if not OFFLOAD_TO_WORKER else calculate_in_worker_process if self.number_of_records is None or self.number_of_records < 0 or force_calculate: if self._number_of_records_callback is not None: self._number_of_records_callback(self) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index c5baa58a..62e05556 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -300,8 +300,6 @@ def print_tree(self, show_schema=False, show_descriptions=False): return print(tree) - - def apply_layout(self, y_spacing: int = 150, x_spacing: int = 200, initial_y: int = 100): """Calculates and applies a layered layout to all nodes in the graph. @@ -490,7 +488,8 @@ def analysis_preparation(flowfile_table: FlowDataEngine): node_id=node.node_id, flow_id=self.flow_id, ) - node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref) + node.results.analysis_data_generator = get_read_top_n(external_sampler.status.file_ref, + n=min(sample_size, number_of_records)) return flowfile_table def schema_callback(): @@ -1581,12 +1580,13 @@ def run_graph(self) -> RunInformation | None: execution_order = determine_execution_order(all_nodes=[node for node in self.nodes if node not in skip_nodes], flow_starts=self._flow_starts+self.get_implicit_starter_nodes()) - skip_node_message(self.flow_logger, skip_nodes) execution_order_message(self.flow_logger, execution_order) performance_mode = self.flow_settings.execution_mode == 'Performance' if self.flow_settings.execution_location == 'local': OFFLOAD_TO_WORKER.value = False + elif self.flow_settings.execution_location == 'remote': + OFFLOAD_TO_WORKER.value = True for node in execution_order: node_logger = self.flow_logger.get_node_logger(node.node_id) if self.flow_settings.is_canceled: diff --git a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py index 52542605..52c51e4f 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py +++ b/flowfile_core/flowfile_core/flowfile/flow_node/flow_node.py @@ -5,7 +5,7 @@ from flowfile_core.utils.arrow_reader import get_read_top_n from flowfile_core.schemas import input_schema, schemas from flowfile_core.configs.flow_logger import NodeLogger -from flowfile_core.configs.settings import SINGLE_FILE_MODE +from flowfile_core.configs.settings import SINGLE_FILE_MODE, OFFLOAD_TO_WORKER from flowfile_core.schemas.output_model import TableExample, FileColumn, NodeData from flowfile_core.flowfile.utils import get_hash @@ -724,9 +724,19 @@ def execute_full_local(self, performance_mode: bool = False) -> None: Raises: Exception: Propagates exceptions from the execution. """ - if self.results.resulting_data is None and not performance_mode: - self.results.resulting_data = self.get_resulting_data() - self.results.example_data_generator = lambda: self.get_resulting_data().get_sample(100).to_arrow() + def example_data_generator(): + example_data = None + + def get_example_data(): + nonlocal example_data + if example_data is None: + example_data = resulting_data.get_sample(100).to_arrow() + return example_data + return get_example_data + resulting_data = self.get_resulting_data() + + if not performance_mode: + self.results.example_data_generator = example_data_generator() self.node_schema.result_schema = self.results.resulting_data.schema self.node_stats.has_completed_last_run = True @@ -899,8 +909,8 @@ def execute_node(self, run_location: schemas.ExecutionLocationsLiteral, reset_ca else: self.results.errors = str(e) node_logger.error(f'Error with running the node: {e}') - elif ((run_location == 'local' or SINGLE_FILE_MODE) and (not self.node_stats.has_run_with_current_setup - or self.node_template.node_group == "output")): + elif ((run_location == 'local' or SINGLE_FILE_MODE) and + (not self.node_stats.has_run_with_current_setup or self.node_template.node_group == "output")): try: node_logger.info('Executing fully locally') self.execute_full_local(performance_mode) diff --git a/flowfile_core/flowfile_core/utils/arrow_reader.py b/flowfile_core/flowfile_core/utils/arrow_reader.py index 48bafea4..75d21001 100644 --- a/flowfile_core/flowfile_core/utils/arrow_reader.py +++ b/flowfile_core/flowfile_core/utils/arrow_reader.py @@ -138,11 +138,16 @@ def collect_batches(reader: pa.ipc.RecordBatchFileReader, n: int) -> Tuple[List[ rows_collected = 0 for batch in iter_batches(reader, n, rows_collected): - batches.append(batch) + rows_collected += batch.num_rows logger.debug(f"Collected batch: total rows now {rows_collected}") if rows_collected >= n: + if rows_collected > n: + batches.append(batch.slice(0, n - (rows_collected - batch.num_rows))) + else: + batches.append(batch) break + batches.append(batch) logger.info(f"Finished collecting {len(batches)} batches with {rows_collected} total rows") return batches, rows_collected @@ -217,7 +222,7 @@ def read_top_n(file_path: str, n: int = 1000, strict: bool = False) -> pa.Table: table = pa.Table.from_batches(batches) # type: ignore logger.info(f"Successfully read {rows_collected} rows from {file_path}") - return table + return table def get_read_top_n(file_path: str, n: int = 1000, strict: bool = False) -> Callable[[], pa.Table]: @@ -244,4 +249,4 @@ def get_read_top_n(file_path: str, n: int = 1000, strict: bool = False) -> Calla >>> table = reader_func() """ logger.info(f"Creating reader function for {file_path} with n={n}, strict={strict}") - return lambda: read_top_n(file_path, n, strict) \ No newline at end of file + return lambda: read_top_n(file_path, n, strict) diff --git a/flowfile_core/flowfile_core/utils/validate_setup.py b/flowfile_core/flowfile_core/utils/validate_setup.py index ebbfc25b..21cf2df0 100644 --- a/flowfile_core/flowfile_core/utils/validate_setup.py +++ b/flowfile_core/flowfile_core/utils/validate_setup.py @@ -34,8 +34,6 @@ def validate_setup(): check_if_node_has_add_function_in_flow_graph(node) check_if_node_has_input_schema_definition(node) - print("All nodes have corresponding functions in FlowGraph and input schema definitions.") - if __name__ == "__main__": validate_setup() diff --git a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py index dde7aef3..e97eaf22 100644 --- a/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py +++ b/flowfile_core/tests/flowfile/analytics/test_analytics_processor.py @@ -4,6 +4,21 @@ from flowfile_core.flowfile.flow_graph import FlowGraph, add_connection, delete_connection from flowfile_core.schemas import input_schema, schemas from flowfile_core.flowfile.flow_data_engine.flow_data_engine import FlowDataEngine +from pathlib import Path + + +def find_parent_directory(target_dir_name, start_path=None): + """Navigate up directories until finding the target directory""" + current_path = Path(start_path) if start_path else Path.cwd() + + while current_path != current_path.parent: + if current_path.name == target_dir_name: + return current_path + if current_path.name == target_dir_name: + return current_path + current_path = current_path.parent + + raise FileNotFoundError(f"Directory '{target_dir_name}' not found") def get_starting_gw_node_settings() -> input_schema.NodeExploreData: @@ -512,3 +527,26 @@ def test_analytics_processor_from_parquet_file_run_performance(): node_step = graph.get_node(2) assert node_step.results.analysis_data_generator, 'The node should have to run' assert node_step.results.analysis_data_generator().__len__() == 1000, 'There should be 1000 rows in the data' + + +def test_analytics_processor_from_parquet_file_run_in_one_local_process(): + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER + OFFLOAD_TO_WORKER.value = False + + graph = create_graph() + + graph.flow_settings.execution_location = "local" + add_node_promise_on_type(graph, 'read', 1, 1) + + received_table = input_schema.ReceivedTable(file_type='parquet', name='table.parquet', + path=str(Path(find_parent_directory('Flowfile'))/'flowfile_core/tests/support_files/data/large_table.parquet')) + node_read = input_schema.NodeRead(flow_id=1, node_id=1, received_file=received_table) + graph.add_read(node_read) + add_node_promise_on_type(graph, 'explore_data', 2, 1) + connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + add_connection(graph, connection) + node_step = graph.get_node(2) + graph.run_graph() + assert node_step.results.analysis_data_generator, 'The node should have to run' + assert node_step.results.analysis_data_generator().__len__() == 10_000, 'There should be 1000 rows in the data' + OFFLOAD_TO_WORKER.value = True diff --git a/flowfile_core/tests/flowfile/test_flowfile.py b/flowfile_core/tests/flowfile/test_flowfile.py index f24c10c3..4dfb65a8 100644 --- a/flowfile_core/tests/flowfile/test_flowfile.py +++ b/flowfile_core/tests/flowfile/test_flowfile.py @@ -350,6 +350,25 @@ def test_add_read_excel(): graph.add_read(input_file=input_schema.NodeRead(**settings)) +def get_dependency_example(): + graph = create_graph() + graph = add_manual_input(graph, data=[{'name': 'John', 'city': 'New York'}, + {'name': 'Jane', 'city': 'Los Angeles'}, + {'name': 'Edward', 'city': 'Chicago'}, + {'name': 'Courtney', 'city': 'Chicago'}] +) + node_promise = input_schema.NodePromise(flow_id=1, node_id=2, node_type='unique') + graph.add_node_promise(node_promise) + + node_connection = input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=2) + add_connection(graph, node_connection) + input_file = input_schema.NodeUnique(flow_id=1, node_id=2, + unique_input=transform_schema.UniqueInput(columns=['city']) + ) + graph.add_unique(input_file) + return graph + + def ensure_excel_is_read_from_arrow_object(): settings = {'flow_id': 1, 'node_id': 1, 'cache_results': True, 'pos_x': 234.37272727272727, 'pos_y': 271.5272727272727, 'is_setup': True, 'description': '', @@ -1052,3 +1071,80 @@ def test_complex_cloud_write_scenario(): node= graph.get_node(3) node.get_table_example(True) graph.run_graph() + + +def test_no_re_calculate_example_data_after_change_no_run(): + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER + + OFFLOAD_TO_WORKER.value = False + + graph = get_dependency_example() + graph.flow_settings.execution_location = "local" + graph.run_graph() + graph.add_formula( + input_schema.NodeFormula( + flow_id=1, + node_id=3, + function=transform_schema.FunctionInput(transform_schema.FieldInput(name="titleCity"), + function="titlecase([city])"), + ) + ) + add_connection(graph, input_schema.NodeConnection.create_from_simple_input(from_id=1, to_id=3)) + graph.run_graph() + + first_data = [row["titleCity"] for row in graph.get_node_data(3, True).main_output.data] + assert len(first_data) > 0, 'Data should be present' + graph.add_formula( + input_schema.NodeFormula( + flow_id=1, + node_id=3, + function=transform_schema.FunctionInput(transform_schema.FieldInput(name="titleCity"), + function="lowercase([city])"), + ) + ) + after_change_data_before_run = [row["titleCity"] for row in graph.get_node_data(3, True).main_output.data] + + assert after_change_data_before_run == first_data, 'Data should be the same after change without run' + assert not graph.get_node(3).node_stats.has_run_with_current_setup + assert graph.get_node(3).node_stats.has_completed_last_run + graph.run_graph() + assert graph.get_node(3).node_stats.has_run_with_current_setup + after_change_data_after_run = [row["titleCity"] for row in graph.get_node_data(3, True).main_output.data] + + assert after_change_data_after_run != first_data, 'Data should be different after run' + + OFFLOAD_TO_WORKER.value = True + +def test_add_fuzzy_match_only_local(): + from flowfile_core.configs.settings import OFFLOAD_TO_WORKER + OFFLOAD_TO_WORKER.value = False + graph = create_graph() + graph.flow_settings.execution_location = "local" + input_data = [{'name': 'eduward'}, + {'name': 'edward'}, + {'name': 'courtney'}] + add_manual_input(graph, data=input_data) + add_node_promise_on_type(graph, 'fuzzy_match', 2) + left_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection = input_schema.NodeConnection.create_from_simple_input(1, 2) + right_connection.input_connection.connection_class = 'input-1' + add_connection(graph, left_connection) + add_connection(graph, right_connection) + data = {'flow_id': 1, 'node_id': 2, 'cache_results': False, 'join_input': + {'join_mapping': [{'left_col': 'name', 'right_col': 'name', 'threshold_score': 75, 'fuzzy_type': 'levenshtein', + 'valid': True}], + 'left_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'right_select': {'renames': [{'old_name': 'name', 'new_name': 'name', 'join_key': True, }]}, + 'how': 'inner'}, 'auto_keep_all': True, 'auto_keep_right': True, 'auto_keep_left': True} + graph.add_fuzzy_match(input_schema.NodeFuzzyMatch(**data)) + run_info = graph.run_graph() + handle_run_info(run_info) + output_data = graph.get_node(2).get_resulting_data() + expected_data = FlowDataEngine([{'name': 'eduward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'edward'}, + {'name': 'edward', 'fuzzy_score_0': 1.0, 'name_right': 'edward'}, + {'name': 'eduward', 'fuzzy_score_0': 1.0, 'name_right': 'eduward'}, + {'name': 'edward', 'fuzzy_score_0': 0.8571428571428572, 'name_right': 'eduward'}, + {'name': 'courtney', 'fuzzy_score_0': 1.0, 'name_right': 'courtney'}] + ) + output_data.assert_equal(expected_data) + OFFLOAD_TO_WORKER.value = True \ No newline at end of file diff --git a/flowfile_core/tests/support_files/data/large_table.parquet b/flowfile_core/tests/support_files/data/large_table.parquet new file mode 100644 index 00000000..d9f5b87e Binary files /dev/null and b/flowfile_core/tests/support_files/data/large_table.parquet differ diff --git a/flowfile_frame/flowfile_frame/expr.py b/flowfile_frame/flowfile_frame/expr.py index 0657aec7..22d995f8 100644 --- a/flowfile_frame/flowfile_frame/expr.py +++ b/flowfile_frame/flowfile_frame/expr.py @@ -490,6 +490,20 @@ def sum(self): result.agg_func = "sum" return result + def unique_counts(self): + """ + Return the number of unique values in the column. + + Returns + ------- + Expr + A new expression with the unique counts + """ + result_expr = self.expr.unique_counts() if self.expr is not None else None + result = self._create_next_expr(method_name="unique_counts", result_expr=result_expr, is_complex=self.is_complex) + result.agg_func = "unique_counts" + return result + def implode(self): result_expr = self.expr.implode() if self.expr is not None else None result = self._create_next_expr(method_name="implode", result_expr=result_expr, is_complex=self.is_complex) diff --git a/flowfile_frame/flowfile_frame/flow_frame.py b/flowfile_frame/flowfile_frame/flow_frame.py index 13571fa0..ad2e5c72 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.py +++ b/flowfile_frame/flowfile_frame/flow_frame.py @@ -565,7 +565,7 @@ def join( coalesce: bool = None, maintain_order: Literal[None, "left", "right", "left_right", "right_left"] = None, description: str = None, - ): + ) -> "FlowFrame": """ Add a join operation to the Logical Plan. diff --git a/flowfile_frame/flowfile_frame/flow_frame.pyi b/flowfile_frame/flowfile_frame/flow_frame.pyi index 7c0b4c25..1c6df182 100644 --- a/flowfile_frame/flowfile_frame/flow_frame.pyi +++ b/flowfile_frame/flowfile_frame/flow_frame.pyi @@ -80,8 +80,8 @@ class FlowFrame: def __ne__(self, other: object) -> typing.NoReturn: ... - # Create and configure a new FlowFrame instance, mimicking Polars' flexible constructor. - def __new__(cls, data: typing.Union[LazyFrame, collections.abc.Mapping[str, typing.Union[collections.abc.Sequence[object], collections.abc.Mapping[str, collections.abc.Sequence[object]], ForwardRef('Series')]], collections.abc.Sequence[typing.Any], ForwardRef('np.ndarray[Any, Any]'), ForwardRef('pa.Table'), ForwardRef('pd.DataFrame'), ForwardRef('ArrowArrayExportable'), ForwardRef('ArrowStreamExportable'), ForwardRef('torch.Tensor')] = None, schema: typing.Union[collections.abc.Mapping[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType'), type[int], type[float], type[bool], type[str], type['date'], type['time'], type['datetime'], type['timedelta'], type[list[typing.Any]], type[tuple[typing.Any, ...]], type[bytes], type[object], type['Decimal'], type[None], NoneType]], collections.abc.Sequence[typing.Union[str, tuple[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType'), type[int], type[float], type[bool], type[str], type['date'], type['time'], type['datetime'], type['timedelta'], type[list[typing.Any]], type[tuple[typing.Any, ...]], type[bytes], type[object], type['Decimal'], type[None], NoneType]]]], NoneType] = None, schema_overrides: collections.abc.Mapping[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType')]] | None = None, strict: bool = True, orient: typing.Optional[typing.Literal['col', 'row']] = None, infer_schema_length: int | None = 100, nan_to_null: bool = False, flow_graph: typing.Optional[flowfile_core.flowfile.flow_graph.FlowGraph] = None, node_id: typing.Optional[int] = None, parent_node_id: typing.Optional[int] = None, override_initial: bool = False) -> Self: ... + # Unified constructor for FlowFrame. + def __new__(cls, data: typing.Union[LazyFrame, collections.abc.Mapping[str, typing.Union[collections.abc.Sequence[object], collections.abc.Mapping[str, collections.abc.Sequence[object]], ForwardRef('Series')]], collections.abc.Sequence[typing.Any], ForwardRef('np.ndarray[Any, Any]'), ForwardRef('pa.Table'), ForwardRef('pd.DataFrame'), ForwardRef('ArrowArrayExportable'), ForwardRef('ArrowStreamExportable'), ForwardRef('torch.Tensor')] = None, schema: typing.Union[collections.abc.Mapping[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType'), type[int], type[float], type[bool], type[str], type['date'], type['time'], type['datetime'], type['timedelta'], type[list[typing.Any]], type[tuple[typing.Any, ...]], type[bytes], type[object], type['Decimal'], type[None], NoneType]], collections.abc.Sequence[typing.Union[str, tuple[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType'), type[int], type[float], type[bool], type[str], type['date'], type['time'], type['datetime'], type['timedelta'], type[list[typing.Any]], type[tuple[typing.Any, ...]], type[bytes], type[object], type['Decimal'], type[None], NoneType]]]], NoneType] = None, schema_overrides: collections.abc.Mapping[str, typing.Union[ForwardRef('DataTypeClass'), ForwardRef('DataType')]] | None = None, strict: bool = True, orient: typing.Optional[typing.Literal['col', 'row']] = None, infer_schema_length: int | None = 100, nan_to_null: bool = False, flow_graph: typing.Optional[flowfile_core.flowfile.flow_graph.FlowGraph] = None, node_id: typing.Optional[int] = None, parent_node_id: typing.Optional[int] = None, **kwargs) -> Self: ... def __repr__(self) -> Any: ... @@ -118,9 +118,6 @@ class FlowFrame: # Execute join using Polars code approach. def _execute_polars_code_join(self, other: FlowFrame, new_node_id: int, on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column], left_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column], right_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column], left_columns: typing.Optional[typing.List[str]], right_columns: typing.Optional[typing.List[str]], how: str, suffix: str, validate: str, nulls_equal: bool, coalesce: bool, maintain_order: typing.Literal[None, 'left', 'right', 'left_right', 'right_left'], description: str) -> 'FlowFrame': ... - # Internal constructor to create a FlowFrame instance that wraps an - def _from_existing_node(self, data: LazyFrame, flow_graph: FlowGraph, node_id: int, parent_node_id: typing.Optional[int] = None) -> 'FlowFrame': ... - # Generates the `input_df.sort(...)` Polars code string using pure expression strings. def _generate_sort_polars_code(self, pure_sort_expr_strs: typing.List[str], descending_values: typing.List[bool], nulls_last_values: typing.List[bool], multithreaded: bool, maintain_order: bool) -> str: ... @@ -231,7 +228,7 @@ class FlowFrame: def interpolate(self, description: Optional[str] = None) -> 'FlowFrame': ... # Add a join operation to the Logical Plan. - def join(self, other, on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, how: str = 'inner', left_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, right_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, suffix: str = '_right', validate: str = None, nulls_equal: bool = False, coalesce: bool = None, maintain_order: typing.Literal[None, 'left', 'right', 'left_right', 'right_left'] = None, description: str = None) -> Any: ... + def join(self, other, on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, how: str = 'inner', left_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, right_on: typing.Union[typing.List[str | flowfile_frame.expr.Column], str, flowfile_frame.expr.Column] = None, suffix: str = '_right', validate: str = None, nulls_equal: bool = False, coalesce: bool = None, maintain_order: typing.Literal[None, 'left', 'right', 'left_right', 'right_left'] = None, description: str = None) -> 'FlowFrame': ... # Perform an asof join. def join_asof(self, other: FlowFrame, left_on: str | None | Expr = None, right_on: str | None | Expr = None, on: str | None | Expr = None, by_left: str | Sequence[str] | None = None, by_right: str | Sequence[str] | None = None, by: str | Sequence[str] | None = None, strategy: AsofJoinStrategy = 'backward', suffix: str = '_right', tolerance: str | int | float | timedelta | None = None, allow_parallel: bool = True, force_parallel: bool = False, coalesce: bool = True, allow_exact_matches: bool = True, check_sortedness: bool = True, description: Optional[str] = None) -> 'FlowFrame': ... diff --git a/flowfile_frontend/package-lock.json b/flowfile_frontend/package-lock.json index a67b3afd..e8f21e6e 100644 --- a/flowfile_frontend/package-lock.json +++ b/flowfile_frontend/package-lock.json @@ -1,12 +1,12 @@ { "name": "Flowfile", - "version": "0.3.4", + "version": "0.3.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "Flowfile", - "version": "0.3.4", + "version": "0.3.7", "dependencies": { "@ag-grid-community/client-side-row-model": "^31.1.1", "@ag-grid-community/core": "^31.1.1", diff --git a/flowfile_frontend/package.json b/flowfile_frontend/package.json index 1f91b110..e07e6ebc 100644 --- a/flowfile_frontend/package.json +++ b/flowfile_frontend/package.json @@ -1,6 +1,6 @@ { "name": "Flowfile", - "version": "0.3.7", + "version": "0.3.8", "description": "A tool for designing and executing data flows", "main": "main/main.js", "scripts": { diff --git a/local_data/local_products.parquet b/local_data/local_products.parquet index 927f9442..067b839a 100644 Binary files a/local_data/local_products.parquet and b/local_data/local_products.parquet differ diff --git a/pyproject.toml b/pyproject.toml index a8cf6a10..f07b0908 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "Flowfile" -version = "0.3.7" +version = "0.3.8" description = "Project combining flowfile core (backend) and flowfile_worker (compute offloader) and flowfile_frame (api)" readme = "readme-pypi.md" authors = ["Edward van Eechoud "]