From 6cd0cb3d56d515785a2bf20cdbae7bd636383cb3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?jerrypeng73=F0=9F=98=8E?= Date: Thu, 6 Jan 2022 17:07:39 -0800 Subject: [PATCH] support JsonGet/Join parameterization in tuning step Hyperparameter Range and Static Hyperparameters --- src/sagemaker/parameter.py | 15 ++++++----- src/sagemaker/tuner.py | 23 +++++++++++++++- tests/integ/test_workflow.py | 52 +++++++++++++++++++++++++++++++----- tests/unit/test_tuner.py | 32 +++++++++++++++++++--- 4 files changed, 106 insertions(+), 16 deletions(-) diff --git a/src/sagemaker/parameter.py b/src/sagemaker/parameter.py index 3b7ef1d4bb..a7f8440f3d 100644 --- a/src/sagemaker/parameter.py +++ b/src/sagemaker/parameter.py @@ -15,6 +15,8 @@ import json from sagemaker.workflow.parameters import Parameter as PipelineParameter +from sagemaker.workflow.functions import JsonGet as PipelineJsonGet +from sagemaker.workflow.functions import Join as PipelineJoin class ParameterRange(object): @@ -71,10 +73,10 @@ def as_tuning_range(self, name): return { "Name": name, "MinValue": str(self.min_value) - if not isinstance(self.min_value, PipelineParameter) + if not isinstance(self.min_value, (PipelineParameter, PipelineJsonGet, PipelineJoin)) else self.min_value, "MaxValue": str(self.max_value) - if not isinstance(self.max_value, PipelineParameter) + if not isinstance(self.max_value, (PipelineParameter, PipelineJsonGet, PipelineJoin)) else self.max_value, "ScalingType": self.scaling_type, } @@ -108,10 +110,11 @@ def __init__(self, values): # pylint: disable=super-init-not-called values (list or object): The possible values for the hyperparameter. This input will be converted into a list of strings. """ - if isinstance(values, list): - self.values = [str(v) if not isinstance(v, PipelineParameter) else v for v in values] - else: - self.values = [str(values) if not isinstance(values, PipelineParameter) else values] + values = values if isinstance(values, list) else [values] + self.values = [ + str(v) if not isinstance(v, (PipelineParameter, PipelineJsonGet, PipelineJoin)) else v + for v in values + ] def as_tuning_range(self, name): """Represent the parameter range as a dictionary. diff --git a/src/sagemaker/tuner.py b/src/sagemaker/tuner.py index efa2617887..f661e26e04 100644 --- a/src/sagemaker/tuner.py +++ b/src/sagemaker/tuner.py @@ -38,6 +38,10 @@ IntegerParameter, ParameterRange, ) +from sagemaker.workflow.parameters import Parameter as PipelineParameter +from sagemaker.workflow.functions import JsonGet as PipelineJsonGet +from sagemaker.workflow.functions import Join as PipelineJoin + from sagemaker.session import Session from sagemaker.utils import base_from_name, base_name_from_image, name_from_base @@ -59,6 +63,18 @@ logger = logging.getLogger(__name__) +def is_pipeline_parameters(value): + """Determine if a value is a pipeline parameter or function representation + + Args: + value (float or int): The value to be verified. + + Returns: + bool: True if it is, False otherwise. + """ + return isinstance(value, (PipelineParameter, PipelineJsonGet, PipelineJoin)) + + class WarmStartTypes(Enum): """Warm Start Configuration type. @@ -359,7 +375,12 @@ def _prepare_static_hyperparameters( ): """Prepare static hyperparameters for one estimator before tuning.""" # Remove any hyperparameter that will be tuned - static_hyperparameters = {str(k): str(v) for (k, v) in estimator.hyperparameters().items()} + static_hyperparameters = { + str(k): str(v) + if not isinstance(v, (PipelineParameter, PipelineJsonGet, PipelineJoin)) + else v + for (k, v) in estimator.hyperparameters().items() + } for hyperparameter_name in hyperparameter_ranges.keys(): static_hyperparameters.pop(hyperparameter_name, None) diff --git a/tests/integ/test_workflow.py b/tests/integ/test_workflow.py index 58b681fd0e..de03608b27 100644 --- a/tests/integ/test_workflow.py +++ b/tests/integ/test_workflow.py @@ -66,14 +66,13 @@ ConditionIn, ConditionLessThanOrEqualTo, ) -from sagemaker.workflow.condition_step import ConditionStep, JsonGet +from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum -from sagemaker.workflow.properties import PropertyFile from sagemaker.wrangler.processing import DataWranglerProcessor from sagemaker.dataset_definition.inputs import DatasetDefinition, AthenaDatasetDefinition from sagemaker.workflow.execution_variables import ExecutionVariables -from sagemaker.workflow.functions import Join +from sagemaker.workflow.functions import Join, JsonGet from sagemaker.wrangler.ingestion import generate_data_ingestion_flow_from_s3_input from sagemaker.workflow.parameters import ( ParameterInteger, @@ -87,6 +86,7 @@ TuningStep, TransformStep, TransformInput, + PropertyFile, ) from sagemaker.workflow.step_collections import RegisterModel from sagemaker.workflow.pipeline import Pipeline @@ -137,7 +137,7 @@ def feature_store_session(sagemaker_session): @pytest.fixture def pipeline_name(): - return f"my-pipeline-{int(time.time() * 10**7)}" + return f"my-pipeline-{int(time.time() * 10 ** 7)}" @pytest.fixture @@ -1371,6 +1371,8 @@ def test_tuning_multi_algos( cpu_instance_type, pipeline_name, region_name, + script_dir, + athena_dataset_definition, ): base_dir = os.path.join(DATA_DIR, "pytorch_mnist") entry_point = os.path.join(base_dir, "mnist.py") @@ -1382,6 +1384,42 @@ def test_tuning_multi_algos( instance_count = ParameterInteger(name="InstanceCount", default_value=1) instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + input_data = f"s3://sagemaker-sample-data-{region_name}/processing/census/census-income.csv" + + sklearn_processor = SKLearnProcessor( + framework_version="0.20.0", + instance_type=instance_type, + instance_count=instance_count, + base_job_name="test-sklearn", + sagemaker_session=sagemaker_session, + role=role, + ) + + property_file = PropertyFile( + name="DataAttributes", output_name="attributes", path="attributes.json" + ) + + step_process = ProcessingStep( + name="my-process", + display_name="ProcessingStep", + description="description for Processing step", + processor=sklearn_processor, + inputs=[ + ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), + ProcessingInput(dataset_definition=athena_dataset_definition), + ], + outputs=[ + ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"), + ProcessingOutput(output_name="attributes", source="/opt/ml/processing/attributes.json"), + ], + property_files=[property_file], + code=os.path.join(script_dir, "preprocessing.py"), + ) + + static_hp_1 = ParameterString(name="InstanceType", default_value="ml.m5.xlarge") + json_get_hp = JsonGet( + step_name=step_process.name, property_file=property_file, json_path="train_size" + ) pytorch_estimator = PyTorch( entry_point=entry_point, role=role, @@ -1392,10 +1430,11 @@ def test_tuning_multi_algos( sagemaker_session=sagemaker_session, enable_sagemaker_metrics=True, max_retry_attempts=3, + hyperparameters={"static-hp": static_hp_1, "train_size": json_get_hp}, ) min_batch_size = ParameterString(name="MinBatchSize", default_value="64") - max_batch_size = ParameterString(name="MaxBatchSize", default_value="128") + max_batch_size = json_get_hp tuner = HyperparameterTuner.create( estimator_dict={ @@ -1415,6 +1454,7 @@ def test_tuning_multi_algos( "estimator-2": [{"Name": "test:acc", "Regex": "Overall test accuracy: (.*?);"}], }, ) + inputs = { "estimator-1": TrainingInput(s3_data=input_path), "estimator-2": TrainingInput(s3_data=input_path), @@ -1429,7 +1469,7 @@ def test_tuning_multi_algos( pipeline = Pipeline( name=pipeline_name, parameters=[instance_count, instance_type, min_batch_size, max_batch_size], - steps=[step_tune], + steps=[step_process, step_tune], sagemaker_session=sagemaker_session, ) diff --git a/tests/unit/test_tuner.py b/tests/unit/test_tuner.py index 2ae028e9ba..d6f1f5a648 100644 --- a/tests/unit/test_tuner.py +++ b/tests/unit/test_tuner.py @@ -30,6 +30,8 @@ create_transfer_learning_tuner, HyperparameterTuner, ) +from sagemaker.workflow.functions import JsonGet, Join +from sagemaker.workflow.parameters import ParameterString, ParameterInteger from .tuner_test_utils import * # noqa: F403 @@ -68,14 +70,24 @@ def tuner(estimator): def test_prepare_for_training(tuner): - static_hyperparameters = {"validated": 1, "another_one": 0} + hp1 = JsonGet(step_name="stepname", property_file="pf", json_path="jp") + hp2 = Join(on="/", values=["1", "2", ParameterString(name="ps", default_value="3")]) + + static_hyperparameters = { + "validated": 1, + "another_one": 0, + "hp1": hp1, + "hp2": hp2, + } + tuner.estimator.set_hyperparameters(**static_hyperparameters) tuner._prepare_for_tuning() assert tuner._current_job_name.startswith(IMAGE_NAME) - - assert len(tuner.static_hyperparameters) == 1 + assert len(tuner.static_hyperparameters) == 3 assert tuner.static_hyperparameters["another_one"] == "0" + assert tuner.static_hyperparameters["hp1"] == hp1 + assert tuner.static_hyperparameters["hp2"] == hp2 def test_prepare_for_tuning_with_amazon_estimator(tuner, sagemaker_session): @@ -1156,6 +1168,20 @@ def test_integer_parameter_ranges(): assert ranges["ScalingType"] == "Auto" +def test_integer_parameter_ranges_with_pipeline_parameter(): + min = ParameterInteger(name="p", default_value=2) + max = JsonGet(step_name="sn", property_file="pf", json_path="jp") + scale = ParameterString(name="scale", default_value="Auto") + int_param = IntegerParameter(min, max) + ranges = int_param.as_tuning_range("some") + + assert len(ranges.keys()) == 4 + assert ranges["Name"] == "some" + assert ranges["MinValue"] == min + assert ranges["MaxValue"] == max + assert ranges["ScalingType"] == scale + + def test_integer_parameter_scaling_type(): int_param = IntegerParameter(2, 3, scaling_type="Linear") int_range = int_param.as_tuning_range("range")