-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[App] Multiprocessing-safe work pickling #15836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
46871ed
safe work pickling
e40df25
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] fef035d
mypy
9fe8bfd
Update tests/tests_app/utilities/test_safe_pickle.py
b041d4e
changelog
2c77e46
Merge branch 'safe_pickle' of github.com:Lightning-AI/lightning into …
8732ef4
trying to fix ubuntu test
4a7dd7d
more typing fixes
5723995
trying to fix ubuntu test
22c1625
Update src/lightning_app/CHANGELOG.md
6a3fce3
Update src/lightning_app/utilities/safe_pickle.py
8ebc36d
Merge branch 'master' into safe_pickle
26003eb
Merge branch 'master' into safe_pickle
0443f4e
Merge branch 'master' into safe_pickle
8ac4974
Merge branch 'master' into safe_pickle
1333e9b
trim work args that has mp queue reference
1025aef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 63f46ba
mypy
d06e564
Merge branch 'safe_pickle' of github.com:Lightning-AI/lightning into …
8c955a2
review
f16735f
Update src/lightning_app/utilities/safe_pickle.py
7e0aff7
Merge branch 'master' into safe_pickle
76b3ca6
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] 16a8742
Merge branch 'master' into safe_pickle
tchaton 76e9992
Merge branch 'master' into safe_pickle
Borda 63bafb6
Merge branch 'master' into safe_pickle
d6c553f
Merge branch 'master' into safe_pickle
6f20522
Merge branch 'master' into safe_pickle
akihironitta 4ac4eaa
Merge branch 'master' into safe_pickle
52520b5
Merge branch 'master' into safe_pickle
Borda 8f1b2b1
Merge branch 'master' into safe_pickle
1d41879
Merge branch 'master' into safe_pickle
Borda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| import contextlib | ||
| import pickle | ||
| import sys | ||
| import types | ||
| import typing | ||
| from copy import deepcopy | ||
| from pathlib import Path | ||
|
|
||
| from lightning_app.core.work import LightningWork | ||
| from lightning_app.utilities.app_helpers import _LightningAppRef | ||
|
|
||
| NON_PICKLABLE_WORK_ATTRIBUTES = ["_request_queue", "_response_queue", "_backend", "_setattr_replacement"] | ||
|
|
||
|
|
||
| @contextlib.contextmanager | ||
| def _trimmed_work(work: LightningWork, to_trim: typing.List[str]) -> typing.Iterator[None]: | ||
| """Context manager to trim the work object to remove attributes that are not picklable.""" | ||
| holder = {} | ||
| for arg in to_trim: | ||
| holder[arg] = getattr(work, arg) | ||
| setattr(work, arg, None) | ||
| yield | ||
| for arg in to_trim: | ||
| setattr(work, arg, holder[arg]) | ||
|
|
||
|
|
||
| def get_picklable_work(work: LightningWork) -> LightningWork: | ||
| """Pickling a LightningWork instance fails if done from the work process | ||
| itself. This function is safe to call from the work process within both MultiprocessRuntime | ||
| and Cloud. | ||
| Note: This function modifies the module information of the work object. Specifically, it injects | ||
| the relative module path into the __module__ attribute of the work object. If the object is not | ||
| importable from the CWD, then the pickle load will fail. | ||
|
|
||
| Example: | ||
| for a directory structure like below and the work class is defined in the app.py where | ||
| the app.py is the entrypoint for the app, it will inject `foo.bar.app` into the | ||
| __module__ attribute | ||
|
|
||
| └── foo | ||
| ├── __init__.py | ||
| └── bar | ||
| └── app.py | ||
| """ | ||
|
|
||
| # If the work object not taken from the app ref, there is a thread lock reference | ||
| # somewhere thats preventing it from being pickled. Investigate it later. We | ||
| # shouldn't be fetching the work object from the app ref. TODO @sherin | ||
| app_ref = _LightningAppRef.get_current() | ||
| if app_ref is None: | ||
hhsecond marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| raise RuntimeError("Cannot pickle LightningWork outside of a LightningApp") | ||
| for w in app_ref.works: | ||
| if work.name == w.name: | ||
| # deep-copying the work object to avoid modifying the original work object | ||
| with _trimmed_work(w, to_trim=NON_PICKLABLE_WORK_ATTRIBUTES): | ||
| copied_work = deepcopy(w) | ||
| break | ||
| else: | ||
| raise ValueError(f"Work with name {work.name} not found in the app references") | ||
|
|
||
| # if work is defined in the __main__ or __mp__main__ (the entrypoint file for `lightning run app` command), | ||
| # pickling/unpickling will fail, hence we need patch the module information | ||
| if "_main__" in copied_work.__class__.__module__: | ||
hhsecond marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| work_class_module = sys.modules[copied_work.__class__.__module__] | ||
| work_class_file = work_class_module.__file__ | ||
| if not work_class_file: | ||
| raise ValueError( | ||
| f"Cannot pickle work class {copied_work.__class__.__name__} because we " | ||
| f"couldn't identify the module file" | ||
| ) | ||
| relative_path = Path(work_class_module.__file__).relative_to(Path.cwd()) # type: ignore | ||
| expected_module_name = relative_path.as_posix().replace(".py", "").replace("/", ".") | ||
| # TODO @sherin: also check if the module is importable from the CWD | ||
| fake_module = types.ModuleType(expected_module_name) | ||
| fake_module.__dict__.update(work_class_module.__dict__) | ||
| fake_module.__dict__["__name__"] = expected_module_name | ||
| sys.modules[expected_module_name] = fake_module | ||
| for k, v in fake_module.__dict__.items(): | ||
| if not k.startswith("__") and hasattr(v, "__module__"): | ||
| if "_main__" in v.__module__: | ||
Borda marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| v.__module__ = expected_module_name | ||
| return copied_work | ||
|
|
||
|
|
||
| def dump(work: LightningWork, f: typing.BinaryIO) -> None: | ||
hhsecond marked this conversation as resolved.
Show resolved
Hide resolved
hhsecond marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| picklable_work = get_picklable_work(work) | ||
| pickle.dump(picklable_work, f) | ||
|
|
||
|
|
||
| def load(f: typing.BinaryIO) -> typing.Any: | ||
| # inject current working directory to sys.path | ||
| sys.path.insert(1, str(Path.cwd())) | ||
| work = pickle.load(f) | ||
| sys.path.pop(1) | ||
| return work | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| import subprocess | ||
| from pathlib import Path | ||
|
|
||
|
|
||
| def test_safe_pickle_app(): | ||
| test_dir = Path(__file__).parent / "testdata" | ||
| proc = subprocess.Popen( | ||
| ["lightning", "run", "app", "safe_pickle_app.py", "--open-ui", "false"], stdout=subprocess.PIPE, cwd=test_dir | ||
| ) | ||
| stdout, _ = proc.communicate() | ||
| assert "Exiting the pickling app successfully" in stdout.decode("UTF-8") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| """ | ||
| This app tests three things | ||
| 1. Can a work pickle `self` | ||
| 2. Can the pickled work be unpickled in another work | ||
| 3. Can the pickled work be unpickled from a script | ||
| """ | ||
|
|
||
| import subprocess | ||
| from pathlib import Path | ||
|
|
||
| from lightning_app import LightningApp, LightningFlow, LightningWork | ||
| from lightning_app.utilities import safe_pickle | ||
|
|
||
|
|
||
| class SelfPicklingWork(LightningWork): | ||
| def run(self): | ||
| with open("work.pkl", "wb") as f: | ||
| safe_pickle.dump(self, f) | ||
|
|
||
| def get_test_string(self): | ||
| return f"Hello from {self.__class__.__name__}!" | ||
|
|
||
|
|
||
| class WorkThatLoadsPickledWork(LightningWork): | ||
| def run(self): | ||
| with open("work.pkl", "rb") as f: | ||
| work = safe_pickle.load(f) | ||
| assert work.get_test_string() == "Hello from SelfPicklingWork!" | ||
|
|
||
|
|
||
| script_load_pickled_work = """ | ||
| import pickle | ||
| work = pickle.load(open("work.pkl", "rb")) | ||
| print(work.get_test_string()) | ||
| """ | ||
|
|
||
|
|
||
| class RootFlow(LightningFlow): | ||
| def __init__(self): | ||
| super().__init__() | ||
| self.self_pickling_work = SelfPicklingWork() | ||
| self.work_that_loads_pickled_work = WorkThatLoadsPickledWork() | ||
|
|
||
| def run(self): | ||
| self.self_pickling_work.run() | ||
| self.work_that_loads_pickled_work.run() | ||
|
|
||
| with open("script_that_loads_pickled_work.py", "w") as f: | ||
| f.write(script_load_pickled_work) | ||
|
|
||
| # read the output from subprocess | ||
| proc = subprocess.Popen(["python", "script_that_loads_pickled_work.py"], stdout=subprocess.PIPE) | ||
| assert "Hello from SelfPicklingWork" in proc.stdout.read().decode("UTF-8") | ||
|
|
||
| # deleting the script | ||
| Path("script_that_loads_pickled_work.py").unlink() | ||
| # deleting the pkl file | ||
| Path("work.pkl").unlink() | ||
|
|
||
| self._exit("Exiting the pickling app successfully!!") | ||
|
|
||
|
|
||
| app = LightningApp(RootFlow()) | ||
hhsecond marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.