diff --git a/src/sagemaker/workflow/pipeline.py b/src/sagemaker/workflow/pipeline.py index 15523ce881..c43fb5c837 100644 --- a/src/sagemaker/workflow/pipeline.py +++ b/src/sagemaker/workflow/pipeline.py @@ -294,7 +294,7 @@ def upsert( if not (error_code == "ValidationException" and "already exists" in error_message): raise ce # already exists - response = self.update(role_arn, description) + response = self.update(role_arn, description, parallelism_config=parallelism_config) # add new tags to existing resource if tags is not None: old_tags = self.sagemaker_session.sagemaker_client.list_tags( diff --git a/tests/integ/sagemaker/workflow/test_workflow.py b/tests/integ/sagemaker/workflow/test_workflow.py index 77022af0c3..462d774885 100644 --- a/tests/integ/sagemaker/workflow/test_workflow.py +++ b/tests/integ/sagemaker/workflow/test_workflow.py @@ -24,6 +24,7 @@ import pandas as pd +from sagemaker.utils import retry_with_backoff from tests.integ.sagemaker.workflow.helpers import wait_pipeline_execution from tests.integ.s3_utils import extract_files_from_s3 from sagemaker.workflow.model_step import ( @@ -1002,7 +1003,7 @@ def test_create_and_update_with_parallelism_config( assert response["ParallelismConfiguration"]["MaxParallelExecutionSteps"] == 50 pipeline.parameters = [ParameterInteger(name="InstanceCount", default_value=1)] - response = pipeline.update(role, parallelism_config={"MaxParallelExecutionSteps": 55}) + response = pipeline.upsert(role, parallelism_config={"MaxParallelExecutionSteps": 55}) update_arn = response["PipelineArn"] assert re.match( rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline_name}", @@ -1019,6 +1020,99 @@ def test_create_and_update_with_parallelism_config( pass +def test_create_and_start_without_parallelism_config_override( + pipeline_session, role, pipeline_name, script_dir +): + sklearn_train = SKLearn( + framework_version="0.20.0", + entry_point=os.path.join(script_dir, "train.py"), + instance_type="ml.m5.xlarge", + sagemaker_session=pipeline_session, + role=role, + ) + + train_steps = [ + TrainingStep( + name=f"my-train-{count}", + display_name="TrainingStep", + description="description for Training step", + step_args=sklearn_train.fit(), + ) + for count in range(2) + ] + pipeline = Pipeline( + name=pipeline_name, + steps=train_steps, + sagemaker_session=pipeline_session, + ) + + try: + pipeline.create(role, parallelism_config=dict(MaxParallelExecutionSteps=1)) + # No ParallelismConfiguration given in pipeline.start, so it won't override that in pipeline.create + execution = pipeline.start() + + def validate(): + # Only one step would be scheduled initially + assert len(execution.list_steps()) == 1 + + retry_with_backoff(validate, num_attempts=4) + + wait_pipeline_execution(execution=execution) + + finally: + try: + pipeline.delete() + except Exception: + pass + + +def test_create_and_start_with_parallelism_config_override( + pipeline_session, role, pipeline_name, script_dir +): + sklearn_train = SKLearn( + framework_version="0.20.0", + entry_point=os.path.join(script_dir, "train.py"), + instance_type="ml.m5.xlarge", + sagemaker_session=pipeline_session, + role=role, + ) + + train_steps = [ + TrainingStep( + name=f"my-train-{count}", + display_name="TrainingStep", + description="description for Training step", + step_args=sklearn_train.fit(), + ) + for count in range(2) + ] + pipeline = Pipeline( + name=pipeline_name, + steps=train_steps, + sagemaker_session=pipeline_session, + ) + + try: + pipeline.create(role, parallelism_config=dict(MaxParallelExecutionSteps=1)) + # Override ParallelismConfiguration in pipeline.start + execution = pipeline.start(parallelism_config=dict(MaxParallelExecutionSteps=2)) + + def validate(): + assert len(execution.list_steps()) == 2 + for step in execution.list_steps(): + assert step["StepStatus"] == "Executing" + + retry_with_backoff(validate, num_attempts=4) + + wait_pipeline_execution(execution=execution) + + finally: + try: + pipeline.delete() + except Exception: + pass + + def test_model_registration_with_tuning_model( pipeline_session, role, diff --git a/tests/unit/sagemaker/workflow/test_pipeline.py b/tests/unit/sagemaker/workflow/test_pipeline.py index 9f9d1db870..4bee5ffd04 100644 --- a/tests/unit/sagemaker/workflow/test_pipeline.py +++ b/tests/unit/sagemaker/workflow/test_pipeline.py @@ -26,7 +26,6 @@ from sagemaker.workflow.execution_variables import ExecutionVariables from sagemaker.workflow.parameters import ParameterString from sagemaker.workflow.pipeline import Pipeline, PipelineGraph -from sagemaker.workflow.parallelism_config import ParallelismConfiguration from sagemaker.workflow.pipeline_experiment_config import ( PipelineExperimentConfig, PipelineExperimentConfigProperties, @@ -126,10 +125,12 @@ def test_pipeline_create_with_parallelism_config(sagemaker_session_mock, role_ar name="MyPipeline", parameters=[], steps=[], - pipeline_experiment_config=ParallelismConfiguration(max_parallel_execution_steps=10), sagemaker_session=sagemaker_session_mock, ) - pipeline.create(role_arn=role_arn) + pipeline.create( + role_arn=role_arn, + parallelism_config=dict(MaxParallelExecutionSteps=10), + ) assert sagemaker_session_mock.sagemaker_client.create_pipeline.called_with( PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(), @@ -138,6 +139,42 @@ def test_pipeline_create_with_parallelism_config(sagemaker_session_mock, role_ar ) +def test_pipeline_create_and_start_with_parallelism_config(sagemaker_session_mock, role_arn): + pipeline = Pipeline( + name="MyPipeline", + parameters=[], + steps=[], + sagemaker_session=sagemaker_session_mock, + ) + pipeline.create( + role_arn=role_arn, + parallelism_config=dict(MaxParallelExecutionSteps=10), + ) + assert sagemaker_session_mock.sagemaker_client.create_pipeline.called_with( + PipelineName="MyPipeline", + PipelineDefinition=pipeline.definition(), + RoleArn=role_arn, + ParallelismConfiguration={"MaxParallelExecutionSteps": 10}, + ) + + sagemaker_session_mock.sagemaker_client.start_pipeline_execution.return_value = dict( + PipelineExecutionArn="pipeline-execution-arn" + ) + + # No ParallelismConfiguration specified + pipeline.start() + assert sagemaker_session_mock.sagemaker_client.start_pipeline_execution.call_args[1] == { + "PipelineName": "MyPipeline" + } + + # Specify ParallelismConfiguration to another value which will be honored in backend + pipeline.start(parallelism_config=dict(MaxParallelExecutionSteps=20)) + assert sagemaker_session_mock.sagemaker_client.start_pipeline_execution.called_with( + PipelineName="MyPipeline", + ParallelismConfiguration={"MaxParallelExecutionSteps": 20}, + ) + + @patch("sagemaker.s3.S3Uploader.upload_string_as_file_body") def test_large_pipeline_create(sagemaker_session_mock, role_arn): sagemaker_session_mock.sagemaker_config = {} @@ -200,10 +237,12 @@ def test_pipeline_update_with_parallelism_config(sagemaker_session_mock, role_ar name="MyPipeline", parameters=[], steps=[], - pipeline_experiment_config=ParallelismConfiguration(max_parallel_execution_steps=10), sagemaker_session=sagemaker_session_mock, ) - pipeline.create(role_arn=role_arn) + pipeline.create( + role_arn=role_arn, + parallelism_config=dict(MaxParallelExecutionSteps=10), + ) assert sagemaker_session_mock.sagemaker_client.update_pipeline.called_with( PipelineName="MyPipeline", PipelineDefinition=pipeline.definition(),