From 2aee1b46bff6b55b7f0e3e15e1ad308ccd355a70 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Fri, 23 Aug 2024 13:22:35 -0700 Subject: [PATCH 1/9] Add an `Experiment.stop` method to stop jobs prematurely. Teach launchers how to stop the jobs that they manage. --- .../_core/launcher/dragon/dragonLauncher.py | 5 + smartsim/_core/shell/shellLauncher.py | 36 +++++++- smartsim/_core/utils/launcher.py | 12 +++ smartsim/experiment.py | 19 ++++ tests/temp_tests/test_settings/conftest.py | 3 + .../temp_tests/test_settings/test_dispatch.py | 3 + tests/test_experiment.py | 91 +++++++++++++++++-- tests/test_generator.py | 4 +- tests/test_launch_history.py | 3 + 9 files changed, 163 insertions(+), 13 deletions(-) diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 39e1958815..af26a8b820 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -263,6 +263,11 @@ def stop(self, step_name: str) -> StepInfo: step_info.launcher_status = str(JobStatus.CANCELLED) return step_info + def stop_jobs( + self, *launched_ids: LaunchedJobID + ) -> t.Mapping[LaunchedJobID, JobStatus]: + return {id_: self.stop(id_).status for id_ in launched_ids} + @staticmethod def _unprefix_step_id(step_id: str) -> str: return step_id.split("-", maxsplit=1)[1] diff --git a/smartsim/_core/shell/shellLauncher.py b/smartsim/_core/shell/shellLauncher.py index 95ded35dd9..50e7573c51 100644 --- a/smartsim/_core/shell/shellLauncher.py +++ b/smartsim/_core/shell/shellLauncher.py @@ -67,15 +67,19 @@ def start( self._launched[id_] = sp.Popen((helpers.expand_exe_path(exe), *rest), cwd=path) return id_ + def _get_proc_from_job_id(self, id_: LaunchedJobID, /) -> sp.Popen[bytes]: + if (proc := self._launched.get(id_)) is None: + msg = f"Launcher `{self}` has not launched a job with id `{id_}`" + raise errors.LauncherJobNotFound(msg) + return proc + def get_status( self, *launched_ids: LaunchedJobID ) -> t.Mapping[LaunchedJobID, JobStatus]: return {id_: self._get_status(id_) for id_ in launched_ids} def _get_status(self, id_: LaunchedJobID, /) -> JobStatus: - if (proc := self._launched.get(id_)) is None: - msg = f"Launcher `{self}` has not launched a job with id `{id_}`" - raise errors.LauncherJobNotFound(msg) + proc = self._get_proc_from_job_id(id_) ret_code = proc.poll() if ret_code is None: status = psutil.Process(proc.pid).status() @@ -97,6 +101,32 @@ def _get_status(self, id_: LaunchedJobID, /) -> JobStatus: return JobStatus.COMPLETED return JobStatus.FAILED + def stop_jobs( + self, *launched_ids: LaunchedJobID + ) -> t.Mapping[LaunchedJobID, JobStatus]: + return {id_: self._stop(id_) for id_ in launched_ids} + + def _stop(self, id_: LaunchedJobID, /) -> JobStatus: + proc = self._get_proc_from_job_id(id_) + wait_time = 5 + if proc.poll() is None: + msg = f"Attempting to terminate local process {proc.pid}" + logger.debug(msg) + proc.terminate() + + try: + proc.wait(wait_time) + except TimeoutError: + msg = f"Failed to terminate process {proc.pid}. Attempting to kill." + logger.warning(msg) + proc.kill() + + try: + proc.wait(wait_time) + except TimeoutError: + logger.error(f"Failed to kill process {proc.pid}") + return self._get_status(id_) + @classmethod def create(cls, _: Experiment) -> Self: return cls() diff --git a/smartsim/_core/utils/launcher.py b/smartsim/_core/utils/launcher.py index 32ca3b2e57..e8f2904761 100644 --- a/smartsim/_core/utils/launcher.py +++ b/smartsim/_core/utils/launcher.py @@ -89,3 +89,15 @@ def get_status( the ids of the `launched_ids` collection is not recognized. :returns: A mapping of launched id to current status """ + + @abc.abstractmethod + def stop_jobs( + self, *launched_ids: LaunchedJobID + ) -> t.Mapping[LaunchedJobID, JobStatus]: + """Given a collection of launched job ids, cancel the launched jobs + + :param launched_ids: The ids of the jobs to stop + :raises smartsim.error.errors.LauncherJobNotFound: If at least one of + the ids of the `launched_ids` collection is not recognized. + :returns: A mapping of launched id to status upon cancellation + """ diff --git a/smartsim/experiment.py b/smartsim/experiment.py index 8cb4dad249..c67e0e1340 100644 --- a/smartsim/experiment.py +++ b/smartsim/experiment.py @@ -368,6 +368,25 @@ def summary(self, style: str = "github") -> str: disable_numparse=True, ) + def stop(self, *ids: LaunchedJobID) -> tuple[JobStatus | InvalidJobStatus, ...]: + """Cancel the execution of a previously launched job. + + :param ids: The ids of the launched jobs to stop. + :raises ValueError: No job ids were provided. + :returns: A tuple of job statuses upon cancellation with order + respective of the order of the calling arguments. + """ + if not ids: + raise ValueError("No job ids provided") + by_launcher = self._launch_history.group_by_launcher(set(ids), unknown_ok=True) + id_to_stop_stat = ( + launcher.stop_jobs(*launched).items() + for launcher, launched in by_launcher.items() + ) + stats_map = dict(itertools.chain.from_iterable(id_to_stop_stat)) + stats = (stats_map.get(id_, InvalidJobStatus.NEVER_STARTED) for id_ in ids) + return tuple(stats) + @property def telemetry(self) -> TelemetryConfiguration: """Return the telemetry configuration for this entity. diff --git a/tests/temp_tests/test_settings/conftest.py b/tests/temp_tests/test_settings/conftest.py index 70b03630a6..1585613c71 100644 --- a/tests/temp_tests/test_settings/conftest.py +++ b/tests/temp_tests/test_settings/conftest.py @@ -68,4 +68,7 @@ def create(cls, exp): def get_status(self, *ids): raise NotImplementedError + def stop_jobs(self, *ids): + raise NotImplementedError + yield _MockLauncher() diff --git a/tests/temp_tests/test_settings/test_dispatch.py b/tests/temp_tests/test_settings/test_dispatch.py index f1545f58ee..89303b5a37 100644 --- a/tests/temp_tests/test_settings/test_dispatch.py +++ b/tests/temp_tests/test_settings/test_dispatch.py @@ -263,6 +263,9 @@ def start(self, strs): def get_status(self, *ids): raise NotImplementedError + def stop_jobs(self, *ids): + raise NotImplementedError + class BufferWriterLauncherSubclass(BufferWriterLauncher): ... diff --git a/tests/test_experiment.py b/tests/test_experiment.py index 2af864ab81..f4e06fd471 100644 --- a/tests/test_experiment.py +++ b/tests/test_experiment.py @@ -136,6 +136,9 @@ def start(self, record: LaunchRecord): def get_status(self, *ids): raise NotImplementedError + def stop_jobs(self, *ids): + raise NotImplementedError + @dataclasses.dataclass(frozen=True) class LaunchRecord: @@ -305,9 +308,14 @@ def start(self, _): def get_status(self, *ids: LaunchedJobID): return {id_: self.id_to_status[id_] for id_ in ids} + def stop_jobs(self, *ids: LaunchedJobID): + stopped = {id_: JobStatus.CANCELLED for id_ in ids} + self.id_to_status |= stopped + return stopped + @pytest.fixture -def make_populated_experment(monkeypatch, experiment): +def make_populated_experiment(monkeypatch, experiment): def impl(num_active_launchers): new_launchers = (GetStatusLauncher() for _ in range(num_active_launchers)) id_to_launcher = { @@ -321,8 +329,8 @@ def impl(num_active_launchers): yield impl -def test_experiment_can_get_statuses(make_populated_experment): - exp = make_populated_experment(num_active_launchers=1) +def test_experiment_can_get_statuses(make_populated_experiment): + exp = make_populated_experiment(num_active_launchers=1) (launcher,) = exp._launch_history.iter_past_launchers() ids = tuple(launcher.known_ids) recieved_stats = exp.get_status(*ids) @@ -337,9 +345,9 @@ def test_experiment_can_get_statuses(make_populated_experment): [pytest.param(i, id=f"{i} launcher(s)") for i in (2, 3, 5, 10, 20, 100)], ) def test_experiment_can_get_statuses_from_many_launchers( - make_populated_experment, num_launchers + make_populated_experiment, num_launchers ): - exp = make_populated_experment(num_active_launchers=num_launchers) + exp = make_populated_experiment(num_active_launchers=num_launchers) launcher_and_rand_ids = ( (launcher, random.choice(tuple(launcher.id_to_status))) for launcher in exp._launch_history.iter_past_launchers() @@ -354,9 +362,9 @@ def test_experiment_can_get_statuses_from_many_launchers( def test_get_status_returns_not_started_for_unrecognized_ids( - monkeypatch, make_populated_experment + monkeypatch, make_populated_experiment ): - exp = make_populated_experment(num_active_launchers=1) + exp = make_populated_experiment(num_active_launchers=1) brand_new_id = create_job_id() ((launcher, (id_not_known_by_exp, *rest)),) = ( exp._launch_history.group_by_launcher().items() @@ -369,7 +377,7 @@ def test_get_status_returns_not_started_for_unrecognized_ids( def test_get_status_de_dups_ids_passed_to_launchers( - monkeypatch, make_populated_experment + monkeypatch, make_populated_experiment ): def track_calls(fn): calls = [] @@ -380,7 +388,7 @@ def impl(*a, **kw): return calls, impl - exp = make_populated_experment(num_active_launchers=1) + exp = make_populated_experiment(num_active_launchers=1) ((launcher, (id_, *_)),) = exp._launch_history.group_by_launcher().items() calls, tracked_get_status = track_calls(launcher.get_status) monkeypatch.setattr(launcher, "get_status", tracked_get_status) @@ -390,3 +398,68 @@ def impl(*a, **kw): assert len(calls) == 1, "Launcher's `get_status` was called more than once" (call,) = calls assert call == ((id_,), {}), "IDs were not de-duplicated" + + +@pytest.mark.parametrize( + "num_launchers", + [pytest.param(i, id=f"{i} launcher(s)") for i in (2, 3, 5, 10, 20, 100)], +) +@pytest.mark.parametrize( + "select_ids", + [ + pytest.param( + lambda history: history._id_to_issuer.keys(), id="All launched jobs" + ), + pytest.param( + lambda history: next(iter(history.group_by_launcher().values())), + id="All from one launcher", + ), + pytest.param( + lambda history: itertools.chain.from_iterable( + random.sample(tuple(ids), len(JobStatus) // 2) + for ids in history.group_by_launcher().values() + ), + id="Subset per launcher", + ), + pytest.param( + lambda history: random.sample( + tuple(history._id_to_issuer), len(history._id_to_issuer) // 3 + ), + id=f"Random subset across all launchers", + ), + ], +) +def test_experiment_can_stop_jobs(make_populated_experiment, num_launchers, select_ids): + exp = make_populated_experiment(num_launchers) + ids = (launcher.known_ids for launcher in exp._launch_history.iter_past_launchers()) + ids = tuple(itertools.chain.from_iterable(ids)) + before_stop_stats = exp.get_status(*ids) + to_cancel = tuple(select_ids(exp._launch_history)) + stats = exp.stop(*to_cancel) + after_stop_stats = exp.get_status(*ids) + assert stats == (JobStatus.CANCELLED,) * len(to_cancel) + assert dict(zip(ids, before_stop_stats)) | dict(zip(to_cancel, stats)) == dict( + zip(ids, after_stop_stats) + ) + + +def test_experiment_raises_if_asked_to_stop_no_jobs(experiment): + with pytest.raises(ValueError, match="No job ids provided"): + experiment.stop() + + +@pytest.mark.parametrize( + "num_launchers", + [pytest.param(i, id=f"{i} launcher(s)") for i in (2, 3, 5, 10, 20, 100)], +) +def test_experiment_does_not_raise_on_unknown_job_id( + make_populated_experiment, num_launchers +): + exp = make_populated_experiment(num_launchers) + new_id = create_job_id() + all_known_ids = tuple(exp._launch_history._id_to_issuer) + before_cancel = exp.get_status(*all_known_ids) + (stat,) = exp.stop(new_id) + assert stat == InvalidJobStatus.NEVER_STARTED + after_cancel = exp.get_status(*all_known_ids) + assert before_cancel == after_cancel diff --git a/tests/test_generator.py b/tests/test_generator.py index 4ecda339bf..3bf69ef767 100644 --- a/tests/test_generator.py +++ b/tests/test_generator.py @@ -21,9 +21,11 @@ pytestmark = pytest.mark.group_a +ID_GENERATOR = (str(i) for i in itertools.count()) + def random_id(): - return str(random.randint(1, 100)) + return next(ID_GENERATOR) @pytest.fixture diff --git a/tests/test_launch_history.py b/tests/test_launch_history.py index 9d3bb31ac4..3b4cd5bcc5 100644 --- a/tests/test_launch_history.py +++ b/tests/test_launch_history.py @@ -48,6 +48,9 @@ def start(self, _): def get_status(self, *_): raise NotImplementedError + def stop_jobs(self, *_): + raise NotImplementedError + LAUNCHER_INSTANCE_A = MockLancher() LAUNCHER_INSTANCE_B = MockLancher() From 5a9433f17d47314ffdd401902203cb22586205cc Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Tue, 27 Aug 2024 11:47:39 -0700 Subject: [PATCH 2/9] Better test name --- tests/test_experiment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_experiment.py b/tests/test_experiment.py index f4e06fd471..c39afee1ba 100644 --- a/tests/test_experiment.py +++ b/tests/test_experiment.py @@ -452,7 +452,7 @@ def test_experiment_raises_if_asked_to_stop_no_jobs(experiment): "num_launchers", [pytest.param(i, id=f"{i} launcher(s)") for i in (2, 3, 5, 10, 20, 100)], ) -def test_experiment_does_not_raise_on_unknown_job_id( +def test_experiment_stop_does_not_raise_on_unknown_job_id( make_populated_experiment, num_launchers ): exp = make_populated_experiment(num_launchers) From 415bff8c5bad0ee275877968a62f4b39594816c6 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Thu, 29 Aug 2024 14:24:11 -0700 Subject: [PATCH 3/9] Docs it up! --- .../_core/launcher/dragon/dragonLauncher.py | 7 +++++ smartsim/_core/shell/shellLauncher.py | 30 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/smartsim/_core/launcher/dragon/dragonLauncher.py b/smartsim/_core/launcher/dragon/dragonLauncher.py index 654e461ce7..f8e2f483d6 100644 --- a/smartsim/_core/launcher/dragon/dragonLauncher.py +++ b/smartsim/_core/launcher/dragon/dragonLauncher.py @@ -267,6 +267,13 @@ def stop(self, step_name: str) -> StepInfo: def stop_jobs( self, *launched_ids: LaunchedJobID ) -> t.Mapping[LaunchedJobID, JobStatus]: + """Take a collection of job ids and issue stop requests to the dragon + backend for each. + + :param launched_ids: The ids of the launched jobs to stop. + :returns: A mapping of ids for jobs to stop to their reported status + after attempting to stop them. + """ return {id_: self.stop(id_).status for id_ in launched_ids} @staticmethod diff --git a/smartsim/_core/shell/shellLauncher.py b/smartsim/_core/shell/shellLauncher.py index 8210cecbb9..6e4c82945e 100644 --- a/smartsim/_core/shell/shellLauncher.py +++ b/smartsim/_core/shell/shellLauncher.py @@ -144,6 +144,15 @@ def start(self, shell_command: ShellLauncherCommand) -> LaunchedJobID: return id_ def _get_proc_from_job_id(self, id_: LaunchedJobID, /) -> sp.Popen[bytes]: + """Given an issued job id, return the process represented by that id + + :param id_: The launched job id of the process + :raises: errors.LauncherJobNotFound: The id could not be mapped to a + process. This usually means that the provided id was not issued by + this launcher instance. + :returns: The process that the shell launcher started and represented + by the issued id. + """ if (proc := self._launched.get(id_)) is None: msg = f"Launcher `{self}` has not launched a job with id `{id_}`" raise errors.LauncherJobNotFound(msg) @@ -180,9 +189,30 @@ def _get_status(self, id_: LaunchedJobID, /) -> JobStatus: def stop_jobs( self, *launched_ids: LaunchedJobID ) -> t.Mapping[LaunchedJobID, JobStatus]: + """Take a collection of job ids and kill the corresponding processes + started by the shell launcher. + + :param launched_ids: The ids of the launched jobs to stop. + :returns: A mapping of ids for jobs to stop to their reported status + after attempting to stop them. + """ return {id_: self._stop(id_) for id_ in launched_ids} def _stop(self, id_: LaunchedJobID, /) -> JobStatus: + """Stop a job represented by an id + + The launcher will first start by attempting to kill the process using + by sending a SIGTERM signal and then waiting for an amount of time. If + the process is not killed by the timeout time, a SIGKILL signal will be + sent and another waiting period will be started. If the period also + ends, the message will be logged and the process will be left to + continue running. The method will then get and return the status of the + job. + + :param id_: The id of a launched job to stop. + :returns: The status of the job after sending signals to terminate the + started process. + """ proc = self._get_proc_from_job_id(id_) wait_time = 5 if proc.poll() is None: From ed4fe1b6b217450239db73432f21d06506de8310 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Thu, 29 Aug 2024 16:05:58 -0700 Subject: [PATCH 4/9] Add shell launcher stop tests --- smartsim/_core/shell/shellLauncher.py | 9 ++- tests/test_shell_launcher.py | 106 +++++++++++++++++++++++--- 2 files changed, 101 insertions(+), 14 deletions(-) diff --git a/smartsim/_core/shell/shellLauncher.py b/smartsim/_core/shell/shellLauncher.py index 6e4c82945e..90b09b8ef8 100644 --- a/smartsim/_core/shell/shellLauncher.py +++ b/smartsim/_core/shell/shellLauncher.py @@ -198,7 +198,7 @@ def stop_jobs( """ return {id_: self._stop(id_) for id_ in launched_ids} - def _stop(self, id_: LaunchedJobID, /) -> JobStatus: + def _stop(self, id_: LaunchedJobID, /, wait_time: float = 5.0) -> JobStatus: """Stop a job represented by an id The launcher will first start by attempting to kill the process using @@ -210,11 +210,12 @@ def _stop(self, id_: LaunchedJobID, /) -> JobStatus: job. :param id_: The id of a launched job to stop. + :param wait: The maximum amount of time, in seconds, to wait for a + signal to stop a process. :returns: The status of the job after sending signals to terminate the started process. """ proc = self._get_proc_from_job_id(id_) - wait_time = 5 if proc.poll() is None: msg = f"Attempting to terminate local process {proc.pid}" logger.debug(msg) @@ -222,14 +223,14 @@ def _stop(self, id_: LaunchedJobID, /) -> JobStatus: try: proc.wait(wait_time) - except TimeoutError: + except sp.TimeoutExpired: msg = f"Failed to terminate process {proc.pid}. Attempting to kill." logger.warning(msg) proc.kill() try: proc.wait(wait_time) - except TimeoutError: + except sp.TimeoutExpired: logger.error(f"Failed to kill process {proc.pid}") return self._get_status(id_) diff --git a/tests/test_shell_launcher.py b/tests/test_shell_launcher.py index b626fbcc62..766a596eca 100644 --- a/tests/test_shell_launcher.py +++ b/tests/test_shell_launcher.py @@ -24,9 +24,12 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import contextlib import os import pathlib import subprocess +import sys +import textwrap import unittest.mock import psutil @@ -83,20 +86,37 @@ def shell_launcher(): launcher = ShellLauncher() yield launcher if any(proc.poll() is None for proc in launcher._launched.values()): - raise ("Test leaked processes") + raise RuntimeError("Test leaked processes") @pytest.fixture -def shell_cmd(test_dir: str) -> ShellLauncherCommand: - """Fixture to create an instance of Generator.""" - run_dir, out_file, err_file = generate_directory(test_dir) - with ( - open(out_file, "w", encoding="utf-8") as out, - open(err_file, "w", encoding="utf-8") as err, +def make_shell_command(test_dir): + run_dir, out_file_, err_file_ = generate_directory(test_dir) + + @contextlib.contextmanager + def impl( + args: t.Sequence[str], + working_dir: str | os.PathLike[str] = run_dir, + env: dict[str, str] | None = None, + out_file: str | os.PathLike[str] = out_file_, + err_file: str | os.PathLike[str] = err_file_, ): - yield ShellLauncherCommand( - {}, run_dir, out, err, EchoHelloWorldEntity().as_program_arguments() - ) + with ( + open(out_file, "w", encoding="utf-8") as out, + open(err_file, "w", encoding="utf-8") as err, + ): + yield ShellLauncherCommand( + env or {}, pathlib.Path(working_dir), out, err, tuple(args) + ) + + yield impl + + +@pytest.fixture +def shell_cmd(make_shell_command) -> ShellLauncherCommand: + """Fixture to create an instance of Generator.""" + with make_shell_command(EchoHelloWorldEntity().as_program_arguments()) as hello: + yield hello # UNIT TESTS @@ -310,3 +330,69 @@ def test_get_status_maps_correctly( value = shell_launcher.get_status(id) assert value.get(id) == job_status assert proc.wait() == 0 + + +@pytest.mark.parametrize( + "args", + ( + pytest.param(("sleep", "60"), id="Sleep for a minute"), + pytest.param( + ( + sys.executable, + "-c", + textwrap.dedent("""\ + import signal, time + signal.signal(signal.SIGINT, lambda n, f: print("Ignoring")) + time.sleep(60) + """), + ), + id="Process Swallows SIGINT", + ), + pytest.param( + ( + sys.executable, + "-c", + textwrap.dedent("""\ + import signal, time + signal.signal(signal.SIGTERM, lambda n, f: print("Ignoring")) + time.sleep(60) + """), + ), + id="Process Swallows SIGTERM", + ), + ), +) +def test_launcher_can_stop_processes(shell_launcher, make_shell_command, args): + with make_shell_command(args) as cmd: + start = time.perf_counter() + id_ = shell_launcher.start(cmd) + time.sleep(0.1) + assert {id_: JobStatus.RUNNING} == shell_launcher.get_status(id_) + assert JobStatus.FAILED == shell_launcher._stop(id_, wait_time=0.25) + end = time.perf_counter() + assert {id_: JobStatus.FAILED} == shell_launcher.get_status(id_) + proc = shell_launcher._launched[id_] + assert proc.poll() is not None + assert proc.poll() != 0 + assert end - start < 1 + + +def test_launcher_can_stop_many_processes( + make_shell_command, shell_launcher, shell_cmd +): + with ( + make_shell_command(("sleep", "60")) as sleep_60, + make_shell_command(("sleep", "45")) as sleep_45, + make_shell_command(("sleep", "30")) as sleep_30, + ): + id_60 = shell_launcher.start(sleep_60) + id_45 = shell_launcher.start(sleep_45) + id_30 = shell_launcher.start(sleep_30) + id_short = shell_launcher.start(shell_cmd) + time.sleep(0.1) + assert { + id_60: JobStatus.FAILED, + id_45: JobStatus.FAILED, + id_30: JobStatus.FAILED, + id_short: JobStatus.COMPLETED, + } == shell_launcher.stop_jobs(id_30, id_45, id_60, id_short) From 2afa9f75c9708df4aa384edc0cfba22ee6096811 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Thu, 29 Aug 2024 16:33:17 -0700 Subject: [PATCH 5/9] Rm dup param --- tests/test_shell_launcher.py | 40 +++++++++++++++--------------------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/tests/test_shell_launcher.py b/tests/test_shell_launcher.py index c84b0e9f37..ae3d95f7f8 100644 --- a/tests/test_shell_launcher.py +++ b/tests/test_shell_launcher.py @@ -336,29 +336,21 @@ def test_get_status_maps_correctly( "args", ( pytest.param(("sleep", "60"), id="Sleep for a minute"), - pytest.param( - ( - sys.executable, - "-c", - textwrap.dedent("""\ - import signal, time - signal.signal(signal.SIGINT, lambda n, f: print("Ignoring")) - time.sleep(60) - """), - ), - id="Process Swallows SIGINT", - ), - pytest.param( - ( - sys.executable, - "-c", - textwrap.dedent("""\ - import signal, time - signal.signal(signal.SIGTERM, lambda n, f: print("Ignoring")) - time.sleep(60) - """), - ), - id="Process Swallows SIGTERM", + *( + pytest.param( + ( + sys.executable, + "-c", + textwrap.dedent(f"""\ + import signal, time + signal.signal(signal.{signal_name}, + lambda n, f: print("Ignoring")) + time.sleep(60) + """), + ), + id=f"Process Swallows {signal_name}", + ) + for signal_name in ("SIGINT", "SIGTERM") ), ), ) @@ -374,7 +366,7 @@ def test_launcher_can_stop_processes(shell_launcher, make_shell_command, args): proc = shell_launcher._launched[id_] assert proc.poll() is not None assert proc.poll() != 0 - assert end - start < 1 + assert 0.1 < end - start < 1 def test_launcher_can_stop_many_processes( From 875596c1ec6d359e12cb26f6f3f317e118cce482 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Thu, 29 Aug 2024 16:44:55 -0700 Subject: [PATCH 6/9] Make 3.9 happy --- tests/test_shell_launcher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_shell_launcher.py b/tests/test_shell_launcher.py index ae3d95f7f8..95e8847108 100644 --- a/tests/test_shell_launcher.py +++ b/tests/test_shell_launcher.py @@ -24,6 +24,8 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from __future__ import annotations + import contextlib import os import pathlib From 066b52e4e4e4e646f042bd5f8f5484ecc6f3199d Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Fri, 30 Aug 2024 09:34:57 -0700 Subject: [PATCH 7/9] Even more docs! --- smartsim/_core/shell/shellLauncher.py | 33 +++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/smartsim/_core/shell/shellLauncher.py b/smartsim/_core/shell/shellLauncher.py index 922bf7b8a0..e12567063c 100644 --- a/smartsim/_core/shell/shellLauncher.py +++ b/smartsim/_core/shell/shellLauncher.py @@ -119,16 +119,28 @@ def impl( class ShellLauncher: - """Mock launcher for launching/tracking simple shell commands""" + """A launcher for launching/tracking local shell commands""" def __init__(self) -> None: + """Initialize a new shell launcher.""" self._launched: dict[LaunchedJobID, sp.Popen[bytes]] = {} def check_popen_inputs(self, shell_command: ShellLauncherCommand) -> None: + """Validate that the contents of a shell command are valid. + + :param shell_command: The command to validate + :raises ValueError: If the command is not valid + """ if not shell_command.path.exists(): raise ValueError("Please provide a valid path to ShellLauncherCommand.") def start(self, shell_command: ShellLauncherCommand) -> LaunchedJobID: + """Have the shell launcher start and track the progress of a new + subprocess. + + :param shell_command: The template of a subprocess to start. + :returns: An id to reference the process for status. + """ self.check_popen_inputs(shell_command) id_ = create_job_id() exe, *rest = shell_command.command_tuple @@ -144,7 +156,7 @@ def start(self, shell_command: ShellLauncherCommand) -> LaunchedJobID: return id_ def _get_proc_from_job_id(self, id_: LaunchedJobID, /) -> sp.Popen[bytes]: - """Given an issued job id, return the process represented by that id + """Given an issued job id, return the process represented by that id. :param id_: The launched job id of the process :raises: errors.LauncherJobNotFound: The id could not be mapped to a @@ -161,9 +173,21 @@ def _get_proc_from_job_id(self, id_: LaunchedJobID, /) -> sp.Popen[bytes]: def get_status( self, *launched_ids: LaunchedJobID ) -> t.Mapping[LaunchedJobID, JobStatus]: + """Take a collection of job ids and return the status of the + corresponding processes started by the shell launcher. + + :param launched_ids: A collection of ids of the launched jobs to get + the statuses of. + :returns: A mapping of ids for jobs to stop to their reported status. + """ return {id_: self._get_status(id_) for id_ in launched_ids} def _get_status(self, id_: LaunchedJobID, /) -> JobStatus: + """Given an issued job id, return the process represented by that id + + :param id_: The launched job id of the process to get the status of. + :returns: The status of that process represented by the given id. + """ proc = self._get_proc_from_job_id(id_) ret_code = proc.poll() if ret_code is None: @@ -236,4 +260,9 @@ def _stop(self, id_: LaunchedJobID, /, wait_time: float = 5.0) -> JobStatus: @classmethod def create(cls, _: Experiment) -> Self: + """Create a new launcher instance from an experiment instance. + + :param _: An experiment instance. + :returns: A new launcher instance. + """ return cls() From ae4fc1e18146ebc0bf2ae2e1cdd6d17f4517694f Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Fri, 30 Aug 2024 09:39:35 -0700 Subject: [PATCH 8/9] Lint --- smartsim/_core/shell/shellLauncher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smartsim/_core/shell/shellLauncher.py b/smartsim/_core/shell/shellLauncher.py index e12567063c..c22ba6ba83 100644 --- a/smartsim/_core/shell/shellLauncher.py +++ b/smartsim/_core/shell/shellLauncher.py @@ -261,7 +261,7 @@ def _stop(self, id_: LaunchedJobID, /, wait_time: float = 5.0) -> JobStatus: @classmethod def create(cls, _: Experiment) -> Self: """Create a new launcher instance from an experiment instance. - + :param _: An experiment instance. :returns: A new launcher instance. """ From 24a170f7399361a0f437af4239595ce7db0fb7f5 Mon Sep 17 00:00:00 2001 From: Matt Drozt Date: Fri, 30 Aug 2024 10:41:55 -0700 Subject: [PATCH 9/9] Testing launcher acts like a 'real' launcher --- tests/test_experiment.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/test_experiment.py b/tests/test_experiment.py index 8dc5113d5c..aff32604c0 100644 --- a/tests/test_experiment.py +++ b/tests/test_experiment.py @@ -42,6 +42,7 @@ from smartsim._core.control.launch_history import LaunchHistory from smartsim._core.utils.launcher import LauncherProtocol, create_job_id from smartsim.entity import entity +from smartsim.error import errors from smartsim.experiment import Experiment from smartsim.launchable import job from smartsim.settings import launchSettings @@ -318,10 +319,16 @@ def create(cls, _): def start(self, _): raise NotImplementedError("{type(self).__name__} should not start anything") + def _assert_ids(self, ids: LaunchedJobID): + if any(id_ not in self.id_to_status for id_ in ids): + raise errors.LauncherJobNotFound + def get_status(self, *ids: LaunchedJobID): + self._assert_ids(ids) return {id_: self.id_to_status[id_] for id_ in ids} def stop_jobs(self, *ids: LaunchedJobID): + self._assert_ids(ids) stopped = {id_: JobStatus.CANCELLED for id_ in ids} self.id_to_status |= stopped return stopped