1111# ANY KIND, either express or implied. See the License for the specific
1212# language governing permissions and limitations under the License.
1313"""This module contains code related to the Processor class, which is used
14- for Processing jobs. These jobs let customers perform data pre-processing,
14+ for Processing jobs. These jobs let users perform data pre-processing,
1515post-processing, feature engineering, data validation, and model evaluation
1616and interpretation on SageMaker.
1717"""
@@ -148,7 +148,10 @@ def run(
148148 self .arguments = arguments
149149
150150 self .latest_job = ProcessingJob .start_new (
151- self , normalized_inputs , normalized_outputs , experiment_config
151+ processor = self ,
152+ inputs = normalized_inputs ,
153+ outputs = normalized_outputs ,
154+ experiment_config = experiment_config ,
152155 )
153156 self .jobs .append (self .latest_job )
154157 if wait :
@@ -260,6 +263,7 @@ def __init__(
260263 self ,
261264 role ,
262265 image_uri ,
266+ command ,
263267 instance_count ,
264268 instance_type ,
265269 volume_size_in_gb = 30 ,
@@ -283,11 +287,12 @@ def __init__(
283287 needs to access an AWS resource.
284288 image_uri (str): The uri of the image to use for the processing
285289 jobs started by the Processor.
290+ command ([str]): The command to run, along with any command-line flags.
291+ Example: ["python3", "-v"].
286292 instance_count (int): The number of instances to run
287293 the Processing job with.
288294 instance_type (str): Type of EC2 instance to use for
289295 processing, for example, 'ml.c4.xlarge'.
290- py_version (str): The python version to use, for example, 'py3'.
291296 volume_size_in_gb (int): Size in GB of the EBS volume
292297 to use for storing data during processing (default: 30).
293298 volume_kms_key (str): A KMS key for the processing
@@ -311,6 +316,7 @@ def __init__(
311316 """
312317 self ._CODE_CONTAINER_BASE_PATH = "/opt/ml/processing/input/"
313318 self ._CODE_CONTAINER_INPUT_NAME = "code"
319+ self .command = command
314320
315321 super (ScriptProcessor , self ).__init__ (
316322 role = role ,
@@ -330,9 +336,7 @@ def __init__(
330336
331337 def run (
332338 self ,
333- command ,
334339 code ,
335- script_name = None ,
336340 inputs = None ,
337341 outputs = None ,
338342 arguments = None ,
@@ -344,13 +348,8 @@ def run(
344348 """Run a processing job with Script Mode.
345349
346350 Args:
347- command([str]): This is a list of strings that includes the executable, along
348- with any command-line flags. For example: ["python3", "-v"]
349351 code (str): This can be an S3 uri or a local path to either
350352 a directory or a file with the user's script to run.
351- script_name (str): If the user provides a directory for source,
352- they must specify script_name as the file within that
353- directory to use.
354353 inputs ([sagemaker.processing.ProcessingInput]): Input files for the processing
355354 job. These must be provided as ProcessingInput objects.
356355 outputs ([str or sagemaker.processing.ProcessingOutput]): Outputs for the processing
@@ -369,50 +368,45 @@ def run(
369368 """
370369 self ._current_job_name = self ._generate_current_job_name (job_name = job_name )
371370
372- customer_script_name = self ._get_customer_script_name (code , script_name )
373- customer_code_s3_uri = self ._upload_code (code )
374- inputs_with_code = self ._convert_code_and_add_to_inputs (inputs , customer_code_s3_uri )
371+ user_script_name = self ._get_user_script_name (code )
372+ user_code_s3_uri = self ._upload_code (code )
373+ inputs_with_code = self ._convert_code_and_add_to_inputs (inputs , user_code_s3_uri )
375374
376- self ._set_entrypoint (command , customer_script_name )
375+ self ._set_entrypoint (self .command , user_script_name )
376+
377+ normalized_inputs = self ._normalize_inputs (inputs_with_code )
378+ normalized_outputs = self ._normalize_outputs (outputs )
379+ self .arguments = arguments
377380
378- super (ScriptProcessor , self ).run (
379- inputs = inputs_with_code ,
380- outputs = outputs ,
381- arguments = arguments ,
382- wait = wait ,
383- logs = logs ,
384- job_name = job_name ,
381+ self .latest_job = ProcessingJob .start_new (
382+ processor = self ,
383+ inputs = normalized_inputs ,
384+ outputs = normalized_outputs ,
385385 experiment_config = experiment_config ,
386386 )
387+ self .jobs .append (self .latest_job )
388+ if wait :
389+ self .latest_job .wait (logs = logs )
387390
388- def _get_customer_script_name (self , code , script_name ):
389- """Finds the customer script name using the provided code file,
391+ def _get_user_script_name (self , code ):
392+ """Finds the user script name using the provided code file,
390393 directory, or script name.
391394
392395 Args:
393396 code (str): This can be an S3 uri or a local path to either
394397 a directory or a file.
395- script_name (str): If the user provides a directory as source,
396- they must specify script_name as the file within that
397- directory to use.
398398
399399 Returns:
400400 str: The script name from the S3 uri or from the file found
401401 on the user's local machine.
402402 """
403- parse_result = urlparse (code )
404-
405- if os .path .isdir (code ) and script_name is None :
403+ if os .path .isdir (code ) is None or not os .path .splitext (code )[1 ]:
406404 raise ValueError (
407- """You provided a directory without providing a script name.
408- Please provide a script name inside the directory that you specified .
405+ """You cannot provide a directory. Please package your code inside of a .whl
406+ file and pass that in, instead .
409407 """
410408 )
411- if (parse_result .scheme == "s3" or os .path .isdir (code )) and script_name is not None :
412- return script_name
413- if parse_result .scheme == "s3" or os .path .isfile (code ):
414- return os .path .basename (code )
415- raise ValueError ("The file or directory you specified does not exist." )
409+ return os .path .basename (code )
416410
417411 def _upload_code (self , code ):
418412 """Uploads a code file or directory specified as a string
@@ -457,16 +451,16 @@ def _convert_code_and_add_to_inputs(self, inputs, s3_uri):
457451 )
458452 return (inputs or []) + [code_file_input ]
459453
460- def _set_entrypoint (self , command , customer_script_name ):
461- """Sets the entrypoint based on the customer 's script and corresponding executable.
454+ def _set_entrypoint (self , command , user_script_name ):
455+ """Sets the entrypoint based on the user 's script and corresponding executable.
462456
463457 Args:
464- customer_script_name (str): A filename with an extension.
458+ user_script_name (str): A filename with an extension.
465459 """
466- customer_script_location = os .path .join (
467- self ._CODE_CONTAINER_BASE_PATH , self ._CODE_CONTAINER_INPUT_NAME , customer_script_name
460+ user_script_location = os .path .join (
461+ self ._CODE_CONTAINER_BASE_PATH , self ._CODE_CONTAINER_INPUT_NAME , user_script_name
468462 )
469- self .entrypoint = command + [customer_script_location ]
463+ self .entrypoint = command + [user_script_location ]
470464
471465
472466class ProcessingJob (_Job ):
@@ -602,7 +596,7 @@ def __init__(
602596 source (str): The source for the input.
603597 destination (str): The destination of the input.
604598 input_name (str): The user-provided name for the input. If a name
605- is not provided, one will be generated.
599+ is not provided, one will be generated (eg. "input-1") .
606600 s3_data_type (str): Valid options are "ManifestFile" or "S3Prefix".
607601 s3_input_mode (str): Valid options are "Pipe" or "File".
608602 s3_data_distribution_type (str): Valid options are "FullyReplicated"
@@ -652,8 +646,10 @@ def __init__(self, source, destination=None, output_name=None, s3_upload_mode="E
652646
653647 Args:
654648 source (str): The source for the output.
655- destination (str): The destination of the output.
656- output_name (str): The name of the output.
649+ destination (str): The destination of the output. If a destination
650+ is not provided, one will be generated (eg. "s3://bucket/job_name/output").
651+ output_name (str): The name of the output. If a name
652+ is not provided, one will be generated (eg. "output-1").
657653 s3_upload_mode (str): Valid options are "EndOfJob" or "Continuous".
658654 """
659655 self .source = source
0 commit comments