Skip to content

Commit b8c7018

Browse files
authored
[App] Wait for full file to be transferred in Path / Payload (#15934)
* Wait for full file to be transferred in Path / Payload * Fixes
1 parent 1283226 commit b8c7018

File tree

6 files changed

+26
-9
lines changed

6 files changed

+26
-9
lines changed

src/lightning_app/core/constants.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,7 @@ def get_lightning_cloud_url() -> str:
7575

7676
def enable_multiple_works_in_default_container() -> bool:
7777
return bool(int(os.getenv("ENABLE_MULTIPLE_WORKS_IN_DEFAULT_CONTAINER", "0")))
78+
79+
80+
# Number of seconds to wait between filesystem checks when waiting for files in remote storage
81+
REMOTE_STORAGE_WAIT = 0.5

src/lightning_app/storage/orchestrator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ def run_once(self, work_name: str) -> None:
105105
name=request.name,
106106
path=maybe_artifact_path,
107107
hash=request.hash,
108+
size=self.fs.info(maybe_artifact_path)["size"],
108109
destination=request.destination,
109110
)
110111
if isinstance(request, _ExistsRequest):
@@ -139,6 +140,7 @@ def run_once(self, work_name: str) -> None:
139140
path=request.path,
140141
name=request.name,
141142
hash=request.hash,
143+
size=0,
142144
destination=request.destination,
143145
)
144146
if isinstance(request, _ExistsRequest):

src/lightning_app/storage/path.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from fsspec import AbstractFileSystem
1111
from fsspec.implementations.local import LocalFileSystem
1212

13+
from lightning_app.core.constants import REMOTE_STORAGE_WAIT
1314
from lightning_app.core.queues import BaseQueue
1415
from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse
1516
from lightning_app.utilities.app_helpers import Logger
@@ -199,9 +200,8 @@ def get(self, overwrite: bool = False) -> None:
199200
fs = _filesystem()
200201

201202
# 3. Wait until the file appears in shared storage
202-
while not fs.exists(response.path):
203-
# TODO: Existence check on folder is not enough, files may not be completely transferred yet
204-
sleep(0.5)
203+
while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size:
204+
sleep(REMOTE_STORAGE_WAIT)
205205

206206
if self.exists_local() and self.is_dir():
207207
# Delete the directory, otherwise we can't overwrite it
@@ -340,10 +340,11 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp
340340
destination_path = _shared_storage_path() / request.hash
341341
response = _GetResponse(
342342
source=request.source,
343+
name=request.name,
343344
path=str(destination_path),
344345
hash=request.hash,
346+
size=source_path.stat().st_size,
345347
destination=request.destination,
346-
name=request.name,
347348
)
348349

349350
try:

src/lightning_app/storage/payload.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from time import sleep
66
from typing import Any, Optional, TYPE_CHECKING, Union
77

8+
from lightning_app.core.constants import REMOTE_STORAGE_WAIT
89
from lightning_app.core.queues import BaseQueue
910
from lightning_app.storage.path import _filesystem, _shared_storage_path, Path
1011
from lightning_app.storage.requests import _ExistsRequest, _ExistsResponse, _GetRequest, _GetResponse
@@ -159,9 +160,8 @@ def get(self) -> Any:
159160
fs = _filesystem()
160161

161162
# 3. Wait until the file appears in shared storage
162-
while not fs.exists(response.path):
163-
# TODO: Existence check on folder is not enough, files may not be completely transferred yet
164-
sleep(0.5)
163+
while not fs.exists(response.path) or fs.info(response.path)["size"] != response.size:
164+
sleep(REMOTE_STORAGE_WAIT)
165165

166166
# 4. Copy the file from the shared storage to the destination on the local filesystem
167167
local_path = self._path
@@ -234,6 +234,7 @@ def _handle_get_request(work: "LightningWork", request: _GetRequest) -> _GetResp
234234
try:
235235
payload = getattr(work, request.name)
236236
payload.save(payload.value, source_path)
237+
response.size = source_path.stat().st_size
237238
_copy_files(source_path, destination_path)
238239
_logger.debug(f"All files copied from {request.path} to {response.path}.")
239240
except Exception as e:

src/lightning_app/storage/requests.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class _GetResponse:
1717
name: str
1818
path: str
1919
hash: str
20+
size: int = 0
2021
destination: str = ""
2122
exception: Optional[Exception] = None
2223
timedelta: Optional[float] = None

tests/tests_app/storage/test_copier.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@ def _handle_exists_request(work, request):
2222
return Path._handle_exists_request(work, request)
2323

2424

25+
@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir")
26+
@mock.patch("lightning_app.storage.path.pathlib.Path.stat")
2527
@mock.patch("lightning_app.storage.copier._filesystem")
26-
def test_copier_copies_all_files(fs_mock, tmpdir):
28+
def test_copier_copies_all_files(fs_mock, stat_mock, dir_mock, tmpdir):
2729
"""Test that the Copier calls the copy with the information provided in the request."""
30+
stat_mock().st_size = 0
31+
dir_mock.return_value = False
2832
copy_request_queue = _MockQueue()
2933
copy_response_queue = _MockQueue()
3034
work = mock.Mock()
@@ -38,9 +42,13 @@ def test_copier_copies_all_files(fs_mock, tmpdir):
3842
fs_mock().put.assert_called_once_with("file", tmpdir / ".shared" / "123")
3943

4044

41-
def test_copier_handles_exception(monkeypatch):
45+
@mock.patch("lightning_app.storage.path.pathlib.Path.is_dir")
46+
@mock.patch("lightning_app.storage.path.pathlib.Path.stat")
47+
def test_copier_handles_exception(stat_mock, dir_mock, monkeypatch):
4248
"""Test that the Copier captures exceptions from the file copy and forwards them through the queue without
4349
raising it."""
50+
stat_mock().st_size = 0
51+
dir_mock.return_value = False
4452
copy_request_queue = _MockQueue()
4553
copy_response_queue = _MockQueue()
4654
fs = mock.Mock()

0 commit comments

Comments
 (0)