@@ -669,8 +669,10 @@ def start(self, **kwargs):
669669 execution = _LocalPipelineExecution (execution_id , self .pipeline , ** kwargs )
670670
671671 self ._executions [execution_id ] = execution
672- print (
673- f"Starting execution for pipeline { self .pipeline .name } . Execution ID is { execution_id } "
672+ logger .info (
673+ "Starting execution for pipeline %s. Execution ID is %s" ,
674+ self .pipeline .name ,
675+ execution_id ,
674676 )
675677 self .last_modified_time = datetime .datetime .now ().timestamp ()
676678
@@ -733,31 +735,32 @@ def update_execution_success(self):
733735 """Mark execution as succeeded."""
734736 self .status = _LocalExecutionStatus .SUCCEEDED .value
735737 self .last_modified_time = datetime .datetime .now ().timestamp ()
736- print ( f "Pipeline execution { self . pipeline_execution_name } SUCCEEDED" )
738+ logger . info ( "Pipeline execution %s SUCCEEDED" , self . pipeline_execution_name )
737739
738740 def update_execution_failure (self , step_name , failure_message ):
739741 """Mark execution as failed."""
740742 self .status = _LocalExecutionStatus .FAILED .value
741743 self .failure_reason = f"Step '{ step_name } ' failed with message: { failure_message } "
742744 self .last_modified_time = datetime .datetime .now ().timestamp ()
743- print (
744- f"Pipeline execution { self .pipeline_execution_name } FAILED because step "
745- f"'{ step_name } ' failed."
745+ logger .info (
746+ "Pipeline execution %s FAILED because step '%s' failed." ,
747+ self .pipeline_execution_name ,
748+ step_name ,
746749 )
747750
748751 def update_step_properties (self , step_name , step_properties ):
749752 """Update pipeline step execution output properties."""
750753 self .step_execution .get (step_name ).update_step_properties (step_properties )
751- print ( f "Pipeline step '{ step_name } ' SUCCEEDED." )
754+ logger . info ( "Pipeline step '%s ' SUCCEEDED." , step_name )
752755
753756 def update_step_failure (self , step_name , failure_message ):
754757 """Mark step_name as failed."""
755- print ( f "Pipeline step '{ step_name } ' FAILED. Failure message is: { failure_message } " )
758+ logger . info ( "Pipeline step '%s ' FAILED. Failure message is: %s" , step_name , failure_message )
756759 self .step_execution .get (step_name ).update_step_failure (failure_message )
757760
758761 def mark_step_executing (self , step_name ):
759762 """Update pipelines step's status to EXECUTING and start_time to now."""
760- print ( f "Starting pipeline step: '{ step_name } '" )
763+ logger . info ( "Starting pipeline step: '%s'" , step_name )
761764 self .step_execution .get (step_name ).mark_step_executing ()
762765
763766 def _initialize_step_execution (self , steps ):
0 commit comments