Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/source-app/examples/dag/dag_from_scratch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@ First, let's define the component we need:
:lines: 55-79

And its run method executes the steps described above.
Additionally, ``work.stop`` is used to reduce cost when running in the cloud.

.. literalinclude:: ../../../examples/app_dag/app.py
:lines: 81-108
:lines: 80-103

----

Expand All @@ -51,4 +50,4 @@ Step 2: Define the scheduling
*****************************

.. literalinclude:: ../../../examples/app_dag/app.py
:lines: 109-137
:lines: 106-135
6 changes: 2 additions & 4 deletions examples/app_dag/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DAG(L.LightningFlow):

"""This component is a DAG."""

def __init__(self, models_paths):
def __init__(self, models_paths: list):
super().__init__()
# Step 1: Create a work to get the data.
self.data_collector = GetDataWork()
Expand All @@ -80,12 +80,10 @@ def __init__(self, models_paths):
def run(self):
# Step 1 and 2: Download and process the data.
self.data_collector.run()
self.data_collector.stop() # Stop the data_collector to reduce cost
self.processing.run(
df_data=self.data_collector.df_data,
df_target=self.data_collector.df_target,
)
self.processing.stop() # Stop the processing to reduce cost

# Step 3: Launch n models training in parallel.
for model, work in self.dict.items():
Expand Down Expand Up @@ -128,7 +126,7 @@ def run(self):
app = L.LightningApp(
ScheduledDAG(
DAG,
models=[
models_paths=[
"svm.SVR",
"linear_model.LinearRegression",
"tree.DecisionTreeRegressor",
Expand Down
3 changes: 3 additions & 0 deletions src/lightning_app/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
- Resolved a bug where the `install` command was not installing the latest version of an app/component by default ([#14181](https://github.com/Lightning-AI/lightning/pull/14181))


- Fixed the `examples/app_dag` example ([#14359](https://github.com/Lightning-AI/lightning/pull/14359))


## [0.5.5] - 2022-08-9

### Deprecated
Expand Down
2 changes: 1 addition & 1 deletion src/lightning_app/testing/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def run_cli(args) -> Generator:
def run_app_in_cloud(
app_folder: str, app_name: str = "app.py", extra_args: List[str] = [], debug: bool = True
) -> Generator:
"""This utility is used to automate testing e2e application with lightning_app.ai."""
"""This utility is used to automate testing e2e application with lightning.ai."""
# 1. Validate the provide app_folder is correct.
if not os.path.exists(os.path.join(app_folder, "app.py")):
raise Exception("The app folder should contain an app.py file.")
Expand Down
21 changes: 21 additions & 0 deletions tests/tests_app_examples/test_app_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os
from time import sleep

import pytest
from tests_app import _PROJECT_ROOT

from lightning_app.testing.testing import run_app_in_cloud


@pytest.mark.cloud
def test_app_dag_example_cloud() -> None:
with run_app_in_cloud(os.path.join(_PROJECT_ROOT, "examples/app_dag")) as (_, _, fetch_logs, _):

launch_log, finish_log = False, False
while not (launch_log and finish_log):
for log in fetch_logs(["flow"]):
if "Launching a new DAG" in log:
launch_log = True
elif "Finished training and evaluating" in log:
finish_log = True
sleep(1)