From e7e59d74736b134cefc7c14bb8015b906c9324e3 Mon Sep 17 00:00:00 2001 From: qidewenwhen Date: Thu, 11 Jan 2024 16:03:07 -0800 Subject: [PATCH] feat: Support selective pipeline execution for function step --- .../core/pipeline_variables.py | 27 +-- src/sagemaker/remote_function/job.py | 23 +- .../runtime_environment_manager.py | 4 +- src/sagemaker/workflow/function_step.py | 7 + src/sagemaker/workflow/pipeline.py | 10 +- tests/integ/sagemaker/workflow/helpers.py | 34 ++- .../workflow/test_selective_execution.py | 201 ++++++++++++++++++ .../sagemaker/workflow/test_step_decorator.py | 5 +- .../core/test_pipeline_variables.py | 98 ++++++--- .../core/test_stored_function.py | 12 +- .../test_runtime_environment_manager.py | 6 +- .../sagemaker/remote_function/test_job.py | 36 +++- .../unit/sagemaker/workflow/test_pipeline.py | 63 ++++-- 13 files changed, 439 insertions(+), 87 deletions(-) create mode 100644 tests/integ/sagemaker/workflow/test_selective_execution.py diff --git a/src/sagemaker/remote_function/core/pipeline_variables.py b/src/sagemaker/remote_function/core/pipeline_variables.py index 269ce94113..952cccdb07 100644 --- a/src/sagemaker/remote_function/core/pipeline_variables.py +++ b/src/sagemaker/remote_function/core/pipeline_variables.py @@ -19,6 +19,7 @@ from sagemaker.s3 import s3_path_join from sagemaker.remote_function.core.serialization import deserialize_obj_from_s3 +from sagemaker.workflow.step_outputs import get_step @dataclass @@ -92,7 +93,7 @@ class _S3BaseUriIdentifier: class _DelayedReturn: """Delayed return from a function.""" - uri: List[Union[str, _Parameter, _ExecutionVariable]] + uri: Union[_Properties, List[Union[str, _Parameter, _ExecutionVariable]]] reference_path: Tuple = field(default_factory=tuple) @@ -164,6 +165,7 @@ def __init__( self, delayed_returns: List[_DelayedReturn], hmac_key: str, + properties_resolver: _PropertiesResolver, parameter_resolver: _ParameterResolver, execution_variable_resolver: _ExecutionVariableResolver, s3_base_uri: str, @@ -174,6 +176,7 @@ def __init__( Args: delayed_returns: list of delayed returns to resolve. hmac_key: key used to encrypt serialized and deserialized function and arguments. + properties_resolver: resolver used to resolve step properties. parameter_resolver: resolver used to pipeline parameters. execution_variable_resolver: resolver used to resolve execution variables. s3_base_uri (str): the s3 base uri of the function step that @@ -184,6 +187,7 @@ def __init__( self._s3_base_uri = s3_base_uri self._parameter_resolver = parameter_resolver self._execution_variable_resolver = execution_variable_resolver + self._properties_resolver = properties_resolver # different delayed returns can have the same uri, so we need to dedupe uris = { self._resolve_delayed_return_uri(delayed_return) for delayed_return in delayed_returns @@ -214,7 +218,10 @@ def resolve(self, delayed_return: _DelayedReturn) -> Any: def _resolve_delayed_return_uri(self, delayed_return: _DelayedReturn): """Resolve the s3 uri of the delayed return.""" + if isinstance(delayed_return.uri, _Properties): + return self._properties_resolver.resolve(delayed_return.uri) + # Keep the following old resolution logics to keep backward compatible uri = [] for component in delayed_return.uri: if isinstance(component, _Parameter): @@ -274,6 +281,7 @@ def resolve_pipeline_variables( delayed_return_resolver = _DelayedReturnResolver( delayed_returns=delayed_returns, hmac_key=hmac_key, + properties_resolver=properties_resolver, parameter_resolver=parameter_resolver, execution_variable_resolver=execution_variable_resolver, s3_base_uri=s3_base_uri, @@ -325,27 +333,12 @@ def convert_pipeline_variables_to_pickleable(func_args: Tuple, func_kwargs: Dict from sagemaker.workflow.entities import PipelineVariable - from sagemaker.workflow.execution_variables import ExecutionVariables - from sagemaker.workflow.function_step import DelayedReturn - # Notes: - # 1. The s3_base_uri = s3_root_uri + pipeline_name, but the two may be unknown - # when defining function steps. After step-level arg serialization, - # it's hard to update the s3_base_uri in pipeline compile time. - # Thus set a placeholder: _S3BaseUriIdentifier, and let the runtime job to resolve it. - # 2. For saying s3_root_uri is unknown, it's because when defining function steps, - # the pipeline's sagemaker_session is not passed in, but the default s3_root_uri - # should be retrieved from the pipeline's sagemaker_session. def convert(arg): if isinstance(arg, DelayedReturn): return _DelayedReturn( - uri=[ - _S3BaseUriIdentifier(), - ExecutionVariables.PIPELINE_EXECUTION_ID._pickleable, - arg._step.name, - "results", - ], + uri=get_step(arg)._properties.OutputDataConfig.S3OutputPath._pickleable, reference_path=arg._reference_path, ) diff --git a/src/sagemaker/remote_function/job.py b/src/sagemaker/remote_function/job.py index 205a2adf41..71530ac4dd 100644 --- a/src/sagemaker/remote_function/job.py +++ b/src/sagemaker/remote_function/job.py @@ -60,6 +60,7 @@ from sagemaker import vpc_utils from sagemaker.remote_function.core.stored_function import StoredFunction, _SerializedData from sagemaker.remote_function.core.pipeline_variables import Context + from sagemaker.remote_function.runtime_environment.runtime_environment_manager import ( RuntimeEnvironmentManager, _DependencySettings, @@ -72,6 +73,8 @@ copy_workdir, resolve_custom_file_filter_from_config_file, ) +from sagemaker.workflow.function_step import DelayedReturn +from sagemaker.workflow.step_outputs import get_step if TYPE_CHECKING: from sagemaker.workflow.entities import PipelineVariable @@ -701,6 +704,7 @@ def compile( """Build the artifacts and generate the training job request.""" from sagemaker.workflow.properties import Properties from sagemaker.workflow.parameters import Parameter + from sagemaker.workflow.functions import Join from sagemaker.workflow.execution_variables import ExecutionVariables, ExecutionVariable from sagemaker.workflow.utilities import load_step_compilation_context @@ -760,7 +764,19 @@ def compile( job_settings=job_settings, s3_base_uri=s3_base_uri ) - output_config = {"S3OutputPath": s3_base_uri} + if step_compilation_context: + s3_output_path = Join( + on="/", + values=[ + s3_base_uri, + ExecutionVariables.PIPELINE_EXECUTION_ID, + step_compilation_context.step_name, + "results", + ], + ) + output_config = {"S3OutputPath": s3_output_path} + else: + output_config = {"S3OutputPath": s3_base_uri} if job_settings.s3_kms_key is not None: output_config["KmsKeyId"] = job_settings.s3_kms_key request_dict["OutputDataConfig"] = output_config @@ -804,6 +820,11 @@ def compile( if isinstance(arg, (Parameter, ExecutionVariable, Properties)): container_args.extend([arg.expr["Get"], arg.to_string()]) + if isinstance(arg, DelayedReturn): + # The uri is a Properties object + uri = get_step(arg)._properties.OutputDataConfig.S3OutputPath + container_args.extend([uri.expr["Get"], uri.to_string()]) + if run_info is not None: container_args.extend(["--run_in_context", json.dumps(dataclasses.asdict(run_info))]) elif _RunContext.get_current_run() is not None: diff --git a/src/sagemaker/remote_function/runtime_environment/runtime_environment_manager.py b/src/sagemaker/remote_function/runtime_environment/runtime_environment_manager.py index 0affa9beac..97ca4f08e4 100644 --- a/src/sagemaker/remote_function/runtime_environment/runtime_environment_manager.py +++ b/src/sagemaker/remote_function/runtime_environment/runtime_environment_manager.py @@ -252,7 +252,7 @@ def _is_file_exists(self, dependencies): def _install_requirements_txt(self, local_path, python_executable): """Install requirements.txt file""" - cmd = f"{python_executable} -m pip install -r {local_path}" + cmd = f"{python_executable} -m pip install -r {local_path} -U" logger.info("Running command: '%s' in the dir: '%s' ", cmd, os.getcwd()) _run_shell_cmd(cmd) logger.info("Command %s ran successfully", cmd) @@ -268,7 +268,7 @@ def _create_conda_env(self, env_name, local_path): def _install_req_txt_in_conda_env(self, env_name, local_path): """Install requirements.txt in the given conda environment""" - cmd = f"{self._get_conda_exe()} run -n {env_name} pip install -r {local_path}" + cmd = f"{self._get_conda_exe()} run -n {env_name} pip install -r {local_path} -U" logger.info("Activating conda env and installing requirements: %s", cmd) _run_shell_cmd(cmd) logger.info("Requirements installed successfully in conda env %s", env_name) diff --git a/src/sagemaker/workflow/function_step.py b/src/sagemaker/workflow/function_step.py index a55955b4eb..55e7eac90c 100644 --- a/src/sagemaker/workflow/function_step.py +++ b/src/sagemaker/workflow/function_step.py @@ -34,6 +34,7 @@ ) from sagemaker.workflow.execution_variables import ExecutionVariables +from sagemaker.workflow.properties import Properties from sagemaker.workflow.retry import RetryPolicy from sagemaker.workflow.steps import Step, ConfigurableRetryStep, StepTypeEnum from sagemaker.workflow.step_collections import StepCollection @@ -101,6 +102,12 @@ def __init__( self.__job_settings = None + # It's for internal usage to retrieve execution id from the properties. + # However, we won't expose the properties of function step to customers. + self._properties = Properties( + step_name=name, step=self, shape_name="DescribeTrainingJobResponse" + ) + ( self._converted_func_args, self._converted_func_kwargs, diff --git a/src/sagemaker/workflow/pipeline.py b/src/sagemaker/workflow/pipeline.py index 0645e58386..6800f2a3ac 100644 --- a/src/sagemaker/workflow/pipeline.py +++ b/src/sagemaker/workflow/pipeline.py @@ -1039,11 +1039,19 @@ def get_function_step_result( raise ValueError(_ERROR_MSG_OF_WRONG_STEP_TYPE) s3_output_path = describe_training_job_response["OutputDataConfig"]["S3OutputPath"] + s3_uri_suffix = s3_path_join(execution_id, step_name, RESULTS_FOLDER) + if s3_output_path.endswith(s3_uri_suffix) or s3_output_path[0:-1].endswith(s3_uri_suffix): + s3_uri = s3_output_path + else: + # This is the obsoleted version of s3_output_path + # Keeping it for backward compatible + s3_uri = s3_path_join(s3_output_path, s3_uri_suffix) + job_status = describe_training_job_response["TrainingJobStatus"] if job_status == "Completed": return deserialize_obj_from_s3( sagemaker_session=sagemaker_session, - s3_uri=s3_path_join(s3_output_path, execution_id, step_name, RESULTS_FOLDER), + s3_uri=s3_uri, hmac_key=describe_training_job_response["Environment"]["REMOTE_FUNCTION_SECRET_KEY"], ) diff --git a/tests/integ/sagemaker/workflow/helpers.py b/tests/integ/sagemaker/workflow/helpers.py index 40681b9ac7..48e1e95734 100644 --- a/tests/integ/sagemaker/workflow/helpers.py +++ b/tests/integ/sagemaker/workflow/helpers.py @@ -39,18 +39,24 @@ def create_and_execute_pipeline( step_result_type=None, step_result_value=None, wait_duration=400, # seconds + selective_execution_config=None, ): - response = pipeline.create(role) - - create_arn = response["PipelineArn"] - assert re.match( - rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", - create_arn, + create_arn = None + if not selective_execution_config: + response = pipeline.create(role) + create_arn = response["PipelineArn"] + assert re.match( + rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", + create_arn, + ) + + execution = pipeline.start( + parameters=execution_parameters, selective_execution_config=selective_execution_config ) - execution = pipeline.start(parameters=execution_parameters) - response = execution.describe() - assert response["PipelineArn"] == create_arn + if create_arn: + response = execution.describe() + assert response["PipelineArn"] == create_arn wait_pipeline_execution(execution=execution, delay=20, max_attempts=int(wait_duration / 20)) @@ -71,6 +77,16 @@ def create_and_execute_pipeline( if step_result_value: result = execution.result(execution_steps[0]["StepName"]) assert result == step_result_value, f"Expected {step_result_value}, instead found {result}" + + if selective_execution_config: + for exe_step in execution_steps: + if exe_step["StepName"] in selective_execution_config.selected_steps: + continue + assert ( + exe_step["SelectiveExecutionResult"]["SourcePipelineExecutionArn"] + == selective_execution_config.source_pipeline_execution_arn + ) + return execution, execution_steps diff --git a/tests/integ/sagemaker/workflow/test_selective_execution.py b/tests/integ/sagemaker/workflow/test_selective_execution.py new file mode 100644 index 0000000000..a2c0286c6a --- /dev/null +++ b/tests/integ/sagemaker/workflow/test_selective_execution.py @@ -0,0 +1,201 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). You +# may not use this file except in compliance with the License. A copy of +# the License is located at +# +# http://aws.amazon.com/apache2.0/ +# +# or in the "license" file accompanying this file. This file is +# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific +# language governing permissions and limitations under the License. +from __future__ import absolute_import + +import os + +import pytest + +from tests.integ import DATA_DIR +from sagemaker.sklearn import SKLearnProcessor +from sagemaker.workflow.step_outputs import get_step + +from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig + +from tests.integ.sagemaker.workflow.helpers import create_and_execute_pipeline +from sagemaker import utils, get_execution_role +from sagemaker.workflow.function_step import step +from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.steps import ProcessingStep + +INSTANCE_TYPE = "ml.m5.large" + + +@pytest.fixture +def role(sagemaker_session): + return get_execution_role(sagemaker_session) + + +@pytest.fixture +def region_name(sagemaker_session): + return sagemaker_session.boto_session.region_name + + +@pytest.fixture +def pipeline_name(): + return utils.unique_name_from_base("Selective-Pipeline") + + +def test_selective_execution_among_pure_function_steps( + sagemaker_session, role, pipeline_name, region_name, dummy_container_without_error +): + # Test Selective Pipeline Execution on function step1 -> [select: function step2] + os.environ["AWS_DEFAULT_REGION"] = region_name + + step_settings = dict( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=60, + ) + + @step(**step_settings) + def generator() -> tuple: + return 3, 4 + + @step(**step_settings) + def sum(a, b): + """adds two numbers""" + return a + b + + step_output_a = generator() + step_output_b = sum(step_output_a[0], step_output_a[1]) + + pipeline = Pipeline( + name=pipeline_name, + steps=[step_output_b], + sagemaker_session=sagemaker_session, + ) + + try: + execution, _ = create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name="sum", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + step_result_value=7, + ) + + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name="sum", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=int, + step_result_value=7, + selective_execution_config=SelectiveExecutionConfig( + source_pipeline_execution_arn=execution.arn, + selected_steps=[get_step(step_output_b).name], + ), + ) + + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_selective_execution_of_regular_step_depended_by_function_step( + sagemaker_session, + role, + pipeline_name, + region_name, + dummy_container_without_error, + sklearn_latest_version, +): + # Test Selective Pipeline Execution on regular step -> [select: function step] + os.environ["AWS_DEFAULT_REGION"] = region_name + + script_path = os.path.join(DATA_DIR, "dummy_script.py") + + sklearn_processor = SKLearnProcessor( + framework_version=sklearn_latest_version, + role=role, + instance_type=INSTANCE_TYPE, + instance_count=1, + command=["python3"], + sagemaker_session=sagemaker_session, + base_job_name="test-sklearn", + ) + + step_sklearn = ProcessingStep( + name="sklearn-process", + processor=sklearn_processor, + code=script_path, + ) + + @step( + role=role, + instance_type=INSTANCE_TYPE, + image_uri=dummy_container_without_error, + keep_alive_period_in_seconds=60, + ) + def func_2(arg): + return arg + + final_output = func_2(step_sklearn.properties.ProcessingJobStatus) + + pipeline = Pipeline( + name=pipeline_name, + steps=[final_output], + sagemaker_session=sagemaker_session, + ) + + try: + execution, _ = create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name="func", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=str, + step_result_value="Completed", + wait_duration=600, + ) + + create_and_execute_pipeline( + pipeline=pipeline, + pipeline_name=pipeline_name, + region_name=region_name, + role=role, + no_of_steps=2, + last_step_name="func", + execution_parameters=dict(), + step_status="Succeeded", + step_result_type=str, + step_result_value="Completed", + wait_duration=600, + selective_execution_config=SelectiveExecutionConfig( + source_pipeline_execution_arn=execution.arn, + selected_steps=[get_step(final_output).name], + ), + ) + + finally: + try: + pipeline.delete() + except Exception: + pass diff --git a/tests/integ/sagemaker/workflow/test_step_decorator.py b/tests/integ/sagemaker/workflow/test_step_decorator.py index 66f59956c3..bdd18a16f2 100644 --- a/tests/integ/sagemaker/workflow/test_step_decorator.py +++ b/tests/integ/sagemaker/workflow/test_step_decorator.py @@ -256,6 +256,7 @@ def sum(a, b): execution_parameters=dict(), step_status="Succeeded", step_result_type=int, + step_result_value=7, ) finally: try: @@ -784,12 +785,10 @@ def updated_func(x): pipeline.create(role) pipeline_definition = json.loads(pipeline.describe()["PipelineDefinition"]) - s3_base_uri = pipeline_definition["Steps"][0]["Arguments"]["OutputDataConfig"][ - "S3OutputPath" - ] step_container_args = pipeline_definition["Steps"][0]["Arguments"][ "AlgorithmSpecification" ]["ContainerArguments"] + s3_base_uri = step_container_args[step_container_args.index("--s3_base_uri") + 1] build_time = step_container_args[step_container_args.index("--func_step_s3_dir") + 1] # some other user updates the pickled function code diff --git a/tests/unit/sagemaker/remote_function/core/test_pipeline_variables.py b/tests/unit/sagemaker/remote_function/core/test_pipeline_variables.py index ebe26653b8..422d1949af 100644 --- a/tests/unit/sagemaker/remote_function/core/test_pipeline_variables.py +++ b/tests/unit/sagemaker/remote_function/core/test_pipeline_variables.py @@ -28,6 +28,7 @@ _DelayedReturnResolver, resolve_pipeline_variables, convert_pipeline_variables_to_pickleable, + _PropertiesResolver, _S3BaseUriIdentifier, ) @@ -47,31 +48,46 @@ def test_resolve_delayed_returns(mock_deserializer): delayed_returns = [ _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 0),) + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 0),), ), _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 1),) + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 1),), ), _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 2),) + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 2),), ), _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), reference_path=(("__getitem__", 2), ("__getitem__", "key")), ), # index out of bounds _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 3),) + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 3),), + ), + _DelayedReturn(uri=_Properties("Steps.func2.OutputDataConfig.S3OutputPath")), + # the obsoleted uri schema in old SDK version + _DelayedReturn( + uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 0),) ), - _DelayedReturn(uri=["s3://my-bucket/", "sub-folder-2/"]), ] mock_deserializer.return_value = (1, 2, {"key": 3}) + context = Context( + property_references={ + "Steps.func1.OutputDataConfig.S3OutputPath": "s3://my_bucket/exe_id/sub_folder1", + "Steps.func2.OutputDataConfig.S3OutputPath": "s3://my_bucket/exe_id/sub_folder2", + } + ) resolver = _DelayedReturnResolver( delayed_returns, "1234", - _ParameterResolver(Context()), - _ExecutionVariableResolver(Context()), + properties_resolver=_PropertiesResolver(context), + parameter_resolver=_ParameterResolver(context), + execution_variable_resolver=_ExecutionVariableResolver(context), sagemaker_session=None, s3_base_uri=f"s3://my-bucket/{PIPELINE_NAME}", ) @@ -83,25 +99,34 @@ def test_resolve_delayed_returns(mock_deserializer): with pytest.raises(IndexError): resolver.resolve(delayed_returns[4]) assert resolver.resolve(delayed_returns[5]) == (1, 2, {"key": 3}) - assert mock_deserializer.call_count == 2 + assert resolver.resolve(delayed_returns[6]) == 1 + assert mock_deserializer.call_count == 3 @patch("sagemaker.remote_function.core.pipeline_variables.deserialize_obj_from_s3") def test_deserializer_fails(mock_deserializer): delayed_returns = [ _DelayedReturn( - uri=["s3://my-bucket/", "sub-folder-1/"], reference_path=(("__getitem__", 0),) + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 0),), ), - _DelayedReturn(uri=["s3://my-bucket/", "sub-folder-2/"]), + _DelayedReturn(uri=_Properties("Steps.func2.OutputDataConfig.S3OutputPath")), ] mock_deserializer.side_effect = Exception("Something went wrong") + context = Context( + property_references={ + "Steps.func1.OutputDataConfig.S3OutputPath": "s3://my_bucket/exe_id/sub_folder1", + "Steps.func2.OutputDataConfig.S3OutputPath": "s3://my_bucket/exe_id/sub_folder2", + } + ) with pytest.raises(Exception, match="Something went wrong"): _DelayedReturnResolver( delayed_returns, "1234", - _ParameterResolver(Context()), - _ExecutionVariableResolver(Context()), + properties_resolver=_PropertiesResolver(context), + parameter_resolver=_ParameterResolver(context), + execution_variable_resolver=_ExecutionVariableResolver(context), sagemaker_session=None, s3_base_uri=f"s3://my-bucket/{PIPELINE_NAME}", ) @@ -142,6 +167,15 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa _ParameterString("parameter_3"), _ParameterFloat("parameter_2"), _ParameterBoolean("parameter_4"), + _DelayedReturn( + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 0),), + ), + _DelayedReturn( + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 1),), + ), + # obsolete uri schema in old SDK version _DelayedReturn( uri=[ _S3BaseUriIdentifier(), @@ -161,7 +195,7 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa _Properties("Steps.step_name.TrainingJobName"), ), {}, - (1, "string", 2.0, True, 1.0, 2.0, "a-cool-name"), + (1, "string", 2.0, True, 1.0, 2.0, 1.0, 2.0, "a-cool-name"), {}, ), ( @@ -172,6 +206,16 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa "c": _ParameterFloat("parameter_2"), "d": _ParameterBoolean("parameter_4"), "e": _DelayedReturn( + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 0),), + ), + "f": _DelayedReturn( + uri=_Properties("Steps.func1.OutputDataConfig.S3OutputPath"), + reference_path=(("__getitem__", 1),), + ), + "g": _Properties("Steps.step_name.TrainingJobName"), + # obsolete uri schema in old SDK version + "h": _DelayedReturn( uri=[ _S3BaseUriIdentifier(), _ExecutionVariable("ExecutionId"), @@ -179,7 +223,7 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa ], reference_path=(("__getitem__", 0),), ), - "f": _DelayedReturn( + "i": _DelayedReturn( uri=[ _S3BaseUriIdentifier(), _ExecutionVariable("ExecutionId"), @@ -187,7 +231,6 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa ], reference_path=(("__getitem__", 1),), ), - "g": _Properties("Steps.step_name.TrainingJobName"), }, (), { @@ -198,6 +241,8 @@ def test_no_pipeline_variables_to_resolve(mock_deserializer, func_args, func_kwa "e": 1.0, "f": 2.0, "g": "a-cool-name", + "h": 1.0, + "i": 2.0, }, ), ], @@ -211,6 +256,7 @@ def test_resolve_pipeline_variables( expected_resolved_kwargs, ): s3_base_uri = f"s3://my-bucket/{PIPELINE_NAME}" + s3_results_uri = f"{s3_base_uri}/execution-id/sub-folder-1" context = Context( property_references={ "Parameters.parameter_1": "1", @@ -218,6 +264,7 @@ def test_resolve_pipeline_variables( "Parameters.parameter_3": "string", "Parameters.parameter_4": "true", "Execution.ExecutionId": "execution-id", + "Steps.func1.OutputDataConfig.S3OutputPath": s3_results_uri, "Steps.step_name.TrainingJobName": "a-cool-name", }, ) @@ -237,7 +284,7 @@ def test_resolve_pipeline_variables( assert resolved_kwargs == expected_resolved_kwargs mock_deserializer.assert_called_once_with( sagemaker_session=None, - s3_uri=f"{s3_base_uri}/execution-id/sub-folder-1", + s3_uri=s3_results_uri, hmac_key="1234", ) @@ -245,6 +292,9 @@ def test_resolve_pipeline_variables( def test_convert_pipeline_variables_to_pickleable(): function_step = Mock() function_step.name = "parent_step" + function_step._properties.OutputDataConfig.S3OutputPath = Properties( + step_name=function_step.name, path="OutputDataConfig.S3OutputPath" + ) func_args = ( DelayedReturn(function_step, reference_path=("__getitem__", 0)), ParameterBoolean("parameter_1"), @@ -274,12 +324,7 @@ def test_convert_pipeline_variables_to_pickleable(): assert converted_args == ( _DelayedReturn( - uri=[ - _S3BaseUriIdentifier(), - _ExecutionVariable(name="PipelineExecutionId"), - "parent_step", - "results", - ], + uri=_Properties(f"Steps.{function_step.name}.OutputDataConfig.S3OutputPath"), reference_path=("__getitem__", 0), ), _ParameterBoolean(name="parameter_1"), @@ -293,12 +338,7 @@ def test_convert_pipeline_variables_to_pickleable(): assert converted_kwargs == { "a": _DelayedReturn( - uri=[ - _S3BaseUriIdentifier(), - _ExecutionVariable(name="PipelineExecutionId"), - "parent_step", - "results", - ], + uri=_Properties(f"Steps.{function_step.name}.OutputDataConfig.S3OutputPath"), reference_path=("__getitem__", 1), ), "b": _ParameterBoolean(name="parameter_1"), diff --git a/tests/unit/sagemaker/remote_function/core/test_stored_function.py b/tests/unit/sagemaker/remote_function/core/test_stored_function.py index bcc09cb585..b263682641 100644 --- a/tests/unit/sagemaker/remote_function/core/test_stored_function.py +++ b/tests/unit/sagemaker/remote_function/core/test_stored_function.py @@ -315,12 +315,11 @@ def test_load_and_invoke_json_serialization( def test_save_and_load_with_pipeline_variable(monkeypatch): session = Mock() s3_base_uri = random_s3_uri() + func1_result_path = f"{s3_base_uri}/execution-id/func_1/results" function_step = _FunctionStep(name="func_1", display_name=None, description=None) x = DelayedReturn(function_step=function_step) - serialize_obj_to_s3( - 3.0, session, f"{s3_base_uri}/execution-id/func_1/results", HMAC_KEY, KMS_KEY - ) + serialize_obj_to_s3(3.0, session, func1_result_path, HMAC_KEY, KMS_KEY) stored_function = StoredFunction( sagemaker_session=session, @@ -332,8 +331,10 @@ def test_save_and_load_with_pipeline_variable(monkeypatch): "Parameters.a": "1.0", "Parameters.b": "2.0", "Parameters.c": "3.0", - "Execution.PipelineExecutionId": "execution-id", + "Steps.func_1.OutputDataConfig.S3OutputPath": func1_result_path, }, + execution_id="execution-id", + step_name="func_2", ), ) @@ -354,8 +355,9 @@ def test_save_and_load_with_pipeline_variable(monkeypatch): stored_function.save_pipeline_step_function(test_serialized_data) stored_function.load_and_invoke() + func2_result_path = f"{s3_base_uri}/execution-id/func_2/results" assert deserialize_obj_from_s3( - session, s3_uri=f"{s3_base_uri}/results", hmac_key=HMAC_KEY + session, s3_uri=func2_result_path, hmac_key=HMAC_KEY ) == quadratic(3.0, a=1.0, b=2.0, c=3.0) diff --git a/tests/unit/sagemaker/remote_function/runtime_environment/test_runtime_environment_manager.py b/tests/unit/sagemaker/remote_function/runtime_environment/test_runtime_environment_manager.py index afbcfb1ec5..45198f3388 100644 --- a/tests/unit/sagemaker/remote_function/runtime_environment/test_runtime_environment_manager.py +++ b/tests/unit/sagemaker/remote_function/runtime_environment/test_runtime_environment_manager.py @@ -208,7 +208,7 @@ def test_bootstrap_req_txt(): call_args = popen.call_args[0][0] assert call_args is not None - expected_cmd = "{} -m pip install -r {}".format(python_exe, TEST_REQUIREMENTS_TXT) + expected_cmd = "{} -m pip install -r {} -U".format(python_exe, TEST_REQUIREMENTS_TXT) assert call_args == expected_cmd @@ -229,7 +229,7 @@ def test_bootstrap_req_txt_error(): call_args = popen.call_args[0][0] assert call_args is not None - expected_cmd = "{} -m pip install -r {}".format(python_exe, TEST_REQUIREMENTS_TXT) + expected_cmd = "{} -m pip install -r {} -U".format(python_exe, TEST_REQUIREMENTS_TXT) assert call_args == expected_cmd @@ -260,7 +260,7 @@ def test_bootstrap_req_txt_with_conda_env(mock_conda_exe): call_args = popen.call_args[0][0] assert call_args is not None - expected_cmd = f"{mock_conda_exe.return_value} run -n conda_env pip install -r usr/local/requirements.txt" + expected_cmd = f"{mock_conda_exe.return_value} run -n conda_env pip install -r usr/local/requirements.txt -U" assert call_args == expected_cmd diff --git a/tests/unit/sagemaker/remote_function/test_job.py b/tests/unit/sagemaker/remote_function/test_job.py index 1884486f8b..ac321d4de0 100644 --- a/tests/unit/sagemaker/remote_function/test_job.py +++ b/tests/unit/sagemaker/remote_function/test_job.py @@ -27,10 +27,13 @@ from sagemaker.remote_function.spark_config import SparkConfig from sagemaker.remote_function.custom_file_filter import CustomFileFilter from sagemaker.remote_function.core.pipeline_variables import Context +from sagemaker.workflow.function_step import DelayedReturn +from sagemaker.workflow.functions import Join from sagemaker.workflow.pipeline_context import _PipelineConfig from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig from sagemaker.workflow.execution_variables import ExecutionVariables from sagemaker.utils import sagemaker_timestamp +from sagemaker.workflow.properties import Properties from tests.unit import DATA_DIR from sagemaker.remote_function.job import ( @@ -763,6 +766,13 @@ def test_get_train_args_under_pipeline_context( mock_stored_function = Mock() mock_stored_function_ctr.return_value = mock_stored_function + function_step = Mock() + function_step.name = "parent_step" + func_step_s3_output_prop = Properties( + step_name=function_step.name, path="OutputDataConfig.S3OutputPath" + ) + function_step._properties.OutputDataConfig.S3OutputPath = func_step_s3_output_prop + job_settings = _JobSettings( dependencies="path/to/dependencies/req.txt", pre_execution_script="path/to/script.sh", @@ -787,8 +797,16 @@ def test_get_train_args_under_pipeline_context( job_name=TEST_JOB_NAME, s3_base_uri=s3_base_uri, func=job_function, - func_args=(1, ParameterInteger(name="b", default_value=2)), - func_kwargs={"c": 3, "d": ParameterInteger(name="d", default_value=4)}, + func_args=( + 1, + ParameterInteger(name="b", default_value=2), + DelayedReturn(function_step, reference_path=("__getitem__", 0)), + ), + func_kwargs={ + "c": 3, + "d": ParameterInteger(name="d", default_value=4), + "e": DelayedReturn(function_step, reference_path=("__getitem__", 1)), + }, serialized_data=mocked_serialized_data, ) @@ -862,7 +880,15 @@ def test_get_train_args_under_pipeline_context( ), ], OutputDataConfig={ - "S3OutputPath": f"{S3_URI}/{TEST_PIPELINE_NAME}", + "S3OutputPath": Join( + on="/", + values=[ + "s3://my-s3-bucket/keyprefix/my-pipeline", + ExecutionVariables.PIPELINE_EXECUTION_ID, + "test-function-step", + "results", + ], + ), "KmsKeyId": KMS_KEY_ARN, }, AlgorithmSpecification=dict( @@ -896,8 +922,12 @@ def test_get_train_args_under_pipeline_context( ExecutionVariables.PIPELINE_EXECUTION_ID, "Parameters.b", ParameterInteger(name="b", default_value=2).to_string(), + "Steps.parent_step.OutputDataConfig.S3OutputPath", + func_step_s3_output_prop.to_string(), "Parameters.d", ParameterInteger(name="d", default_value=4).to_string(), + "Steps.parent_step.OutputDataConfig.S3OutputPath", + func_step_s3_output_prop.to_string(), ], ), ResourceConfig=dict( diff --git a/tests/unit/sagemaker/workflow/test_pipeline.py b/tests/unit/sagemaker/workflow/test_pipeline.py index 136b85fc49..d658455d62 100644 --- a/tests/unit/sagemaker/workflow/test_pipeline.py +++ b/tests/unit/sagemaker/workflow/test_pipeline.py @@ -19,6 +19,7 @@ import pytest from mock import Mock, call, patch +from mock.mock import MagicMock from sagemaker import s3 from sagemaker.remote_function.job import _JobSettings @@ -198,12 +199,7 @@ def test_large_pipeline_create(sagemaker_session_mock, role_arn): ) -@patch("botocore.waiter.create_waiter_with_client") -@patch("sagemaker.workflow.pipeline.deserialize_obj_from_s3") -@patch("sagemaker.s3.S3Downloader") -def test_pipeline_update( - waiter_mock, deserializer_mock, s3_downloader_mock, sagemaker_session_mock, role_arn -): +def test_pipeline_update(sagemaker_session_mock, role_arn): sagemaker_session_mock.sagemaker_config = {} pipeline = Pipeline( name="MyPipeline", @@ -262,6 +258,46 @@ def test_pipeline_update( PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), RoleArn=role_arn ) + +@pytest.mark.parametrize( + "s3_output_path, is_complete_path", + [ + ("s3:/my-bucket/my-key", False), + ("s3:/my-bucket/my-key/myexecution/stepA/results", True), + ("s3:/my-bucket/my-key/myexecution/stepA/results/", True), + ], +) +@patch("botocore.waiter.create_waiter_with_client", MagicMock()) +@patch("sagemaker.workflow.pipeline.deserialize_obj_from_s3") +def test_pipeline_execution_result( + mock_deserialize, s3_output_path, is_complete_path, sagemaker_session_mock, role_arn +): + sagemaker_session_mock.sagemaker_config = {} + + step1 = CustomStep(name="MyStep1") + dr_step_2 = DelayedReturn( + function_step=CustomFunctionStep( + func_args=(1, 2), + func=lambda x, y: x + y + 3, + job_settings=_JobSettings( + image_uri="image", + instance_type="ml.m4.xlarge", + sagemaker_session=sagemaker_session_mock, + ), + name="stepA", + description="", + display_name="", + depends_on=[step1], + ) + ) + pipeline = Pipeline( + name="MyPipeline", + parameters=[], + steps=[dr_step_2], + sagemaker_session=sagemaker_session_mock, + ) + pipeline.create(role_arn=role_arn) + sagemaker_session_mock.sagemaker_client.start_pipeline_execution.return_value = { "PipelineExecutionArn": "arn:aws:sagemaker:us-west-2:111111111111:pipeline/mypipeline/execution/myexecution", } @@ -270,7 +306,7 @@ def test_pipeline_update( sagemaker_session_mock.sagemaker_client.list_pipeline_execution_steps.return_value = { "PipelineExecutionSteps": [ { - "StepName": "stepC", + "StepName": "stepA", "Metadata": { "TrainingJob": { "Arn": "arn:aws:sagemaker:us-west-2:111111111111:training-job/foo" @@ -288,16 +324,15 @@ def test_pipeline_update( ] }, "TrainingJobStatus": "Completed", - "OutputDataConfig": {"S3OutputPath": "s3:/my-bucket/my-key"}, + "OutputDataConfig": {"S3OutputPath": s3_output_path}, "Environment": {"REMOTE_FUNCTION_SECRET_KEY": "abcdefg"}, } - execution.result("stepC") - assert s3_downloader_mock.read_bytes( - "s3:/my-bucket/my-key/myexecution/stepC/results/metadata.json" - ) - assert s3_downloader_mock.read_bytes( - "s3:/my-bucket/my-key/myexecution/stepC/results/payload.pkl" + execution.result("stepA") + + expected_s3_uri = ( + s3_output_path if is_complete_path else f"{s3_output_path}/myexecution/stepA/results" ) + assert mock_deserialize.call_args.kwargs["s3_uri"] == expected_s3_uri def test_pipeline_update_with_parallelism_config(sagemaker_session_mock, role_arn):