2222import attr
2323import botocore
2424import pytz
25- from botocore .exceptions import ClientError
25+ from botocore .exceptions import ClientError , WaiterError
2626
2727from sagemaker import s3
2828from sagemaker ._studio import _append_project_tags
2929from sagemaker .config import PIPELINE_ROLE_ARN_PATH , PIPELINE_TAGS_PATH
3030from sagemaker .remote_function .core .serialization import deserialize_obj_from_s3
3131from sagemaker .remote_function .core .stored_function import RESULTS_FOLDER
32+ from sagemaker .remote_function .errors import RemoteFunctionError
3233from sagemaker .remote_function .job import JOBS_CONTAINER_ENTRYPOINT
3334from sagemaker .s3_utils import s3_path_join
3435from sagemaker .session import Session
@@ -965,8 +966,13 @@ def result(self, step_name: str):
965966
966967 Raises:
967968 ValueError if the provided step is not a ``@step`` decorated function.
969+ RemoteFunctionError if the provided step is not in "Completed" status
968970 """
969- self .wait ()
971+ try :
972+ self .wait ()
973+ except WaiterError as e :
974+ if "Waiter encountered a terminal failure state" in str (e ):
975+ pass
970976 step = next (filter (lambda x : x ["StepName" ] == step_name , self .list_steps ()), None )
971977 if not step :
972978 raise ValueError (f"Invalid step name { step_name } " )
@@ -986,15 +992,22 @@ def result(self, step_name: str):
986992 ]
987993 if container_args != JOBS_CONTAINER_ENTRYPOINT :
988994 raise ValueError (
989- "This method can only be used on pipeline steps created using @step" " decorator."
995+ "This method can only be used on pipeline steps created using @step decorator."
990996 )
991-
992997 s3_output_path = describe_training_job_response ["OutputDataConfig" ]["S3OutputPath" ]
993- return deserialize_obj_from_s3 (
994- sagemaker_session = self .sagemaker_session ,
995- s3_uri = s3_path_join (s3_output_path , self .arn .split ("/" )[- 1 ], step_name , RESULTS_FOLDER ),
996- hmac_key = describe_training_job_response ["Environment" ]["REMOTE_FUNCTION_SECRET_KEY" ],
997- )
998+
999+ job_status = describe_training_job_response ["TrainingJobStatus" ]
1000+ if job_status == "Completed" :
1001+ return deserialize_obj_from_s3 (
1002+ sagemaker_session = self .sagemaker_session ,
1003+ s3_uri = s3_path_join (
1004+ s3_output_path , self .arn .split ("/" )[- 1 ], step_name , RESULTS_FOLDER
1005+ ),
1006+ hmac_key = describe_training_job_response ["Environment" ][
1007+ "REMOTE_FUNCTION_SECRET_KEY"
1008+ ],
1009+ )
1010+ raise RemoteFunctionError (f"Pipeline step { step_name } is in { job_status } status." )
9981011
9991012
10001013class PipelineGraph :
0 commit comments