11from dart .context .locator import injectable
22from dart .model .trigger import TriggerType
33from dart .trigger .base import TriggerProcessor
4+ from dart .model .workflow import WorkflowState , WorkflowInstanceState
5+ from dart .model .action import ActionState
46
7+ import logging
8+ import boto3
9+
10+ _logger = logging .getLogger (__name__ )
511
612zombie_check_trigger = TriggerType (
713 name = 'zombie_check' ,
1016
1117@injectable
1218class ZombieCheckTriggerProcessor (TriggerProcessor ):
13- def __init__ (self , trigger_proxy , workflow_service ):
19+ def __init__ (self , trigger_proxy , action_service , workflow_service ):
1420 self ._trigger_proxy = trigger_proxy
21+ self ._action_service = action_service
1522 self ._workflow_service = workflow_service
1623 self ._trigger_type = zombie_check_trigger
24+ self ._batch_client = boto3 .client ('batch' )
1725
1826 def trigger_type (self ):
1927 return self ._trigger_type
@@ -25,21 +33,87 @@ def initialize_trigger(self, trigger, trigger_service):
2533 def update_trigger (self , unmodified_trigger , modified_trigger ):
2634 return modified_trigger
2735
28- def evaluate_message (self , message , trigger_service ):
36+ def get_not_completed_workflow_instances (self , workflow_msg ):
37+ # get workflow associated with currently triggered workflow
38+ workflow_id = workflow_msg .get ('workflow_id' )
39+ wf = self ._workflow_service .get_workflow (workflow_id , raise_when_missing = False )
40+ if not wf :
41+ _logger .info ('Zombie Check: workflow (id={wf_id}) not found. log-info: {log_info}' .
42+ format (wf_id = workflow_id , log_info = workflow_msg .get ('log_info' )))
43+ return None
44+
45+ if wf .data .state != WorkflowState .ACTIVE :
46+ _logger .info ('Zombie Check: expected workflow (id={wf_id}) to be in ACTIVE state. log-info: {log_info}' .
47+ format (wf_id = workflow_id , log_info = workflow_msg .get ('log_info' )))
48+
49+ # get all workflow_instances of current workflow:
50+ NOT_COMPLETE_STATES = ['QUEUED' , 'RUNNING' ]
51+ all_wf_instances = self ._workflow_service .find_workflow_instances (workflow_id )
52+ current_wf_instances = [wf for wf in all_wf_instances if wf .data .state in NOT_COMPLETE_STATES ]
53+ _logger .info ('Zombie Check: Found workflow instance ids (workflow_id={0}) instances = {1}' .format (workflow_id , current_wf_instances ))
54+
55+ return current_wf_instances
56+
57+ def get_instance_actions (self , current_wf_instances ):
58+ # get all actions of nt completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
59+ incomplete_actions = []
60+ action_2_wf_instance = {}
61+ for wf_instance in current_wf_instances :
62+ wf_instance_actions = self ._action_service .find_actions (workflow_instance_id = wf_instance .id )
63+ incomplete_actions .extend (wf_instance_actions )
64+ for action in wf_instance_actions :
65+ action_2_wf_instance [action .id ] = wf_instance
66+
67+ jobs_2_actions = {}
68+ for action in incomplete_actions :
69+ if action .data .batch_job_id :
70+ jobs_2_actions [action .data .batch_job_id ] = action
71+
72+ return incomplete_actions , jobs_2_actions , action_2_wf_instance
73+
74+ def handle_done_batch_jobs_with_not_complete_wf_instances (self , batch_jobs , jobs_2_actions , action_2_wf_instance ):
75+ for job in batch_jobs .get ('jobs' ):
76+ # jobs fail + action not-failed => fail workflow instance and action
77+ action = jobs_2_actions [job .get ('jobId' )]
78+ if action :
79+ if job .get ('status' ) == 'FAILED' and not (action .data .state in ['FAILED' , 'COMPLETED' ]):
80+ _logger .info ("Zombie Check: Job {0} is failed but action {0} is not failed/completed. Updating action and workflow_instance to FAILED" .format (job .get ('jobId' ), action .id ))
81+ self ._action_service .update_action_state (action , ActionState .FAILED , action .data .error_message )
82+ self ._workflow_service .update_workflow_instance_state (action_2_wf_instance [action .id ], WorkflowInstanceState .FAILED )
83+
84+ # Jobs complete + action not-failed => mark workflow instance as complete and mark actions as complete
85+ if job .get ('status' ) == 'COMPLETED' and not (action .data .state in ['FAILED' , 'COMPLETED' ]):
86+ _logger .info ("Zombie Check: Job {0} is completed but action {0} is not failed/completed. Updating action to COMPLETED" .format (job .get ("jobId" ), action .id ))
87+ self ._action_service .update_action_state (action , ActionState .COMPLETED , action .data .error_message )
88+
89+ def evaluate_message (self , workflow_msg , trigger_service ):
2990 """ :type message: dict
3091 :type trigger_service: dart.service.trigger.TriggerService """
31- # always trigger a manual message
32- ##self._workflow_service.run_triggered_workflow(message, self._trigger_type)
33- ## TODO - check batch vs dart here!
34- TODO = "Check batch state vs DART here."
35- # 1. retrieve running instance workflows (if any) actions.
36- #
92+
93+ current_wf_instances = self .get_not_completed_workflow_instances (workflow_msg )
94+ if not current_wf_instances :
95+ return []
96+
97+ # get all actions of not completed workflow_instances and map ach action to its workflow_instance in action_2_wf_instance
98+ incomplete_actions , jobs_2_actions , action_2_wf_instance = self .get_instance_actions (current_wf_instances )
99+ batch_job_ids = [job .data .batch_job_id for job in incomplete_actions ]
100+ _logger .info ("Zombie Check: extract job_ids {0} form incomplete actions {1}" .format (batch_job_ids , [act .id for act in incomplete_actions ]))
101+
102+ try :
103+ batch_jobs = self ._batch_client .describe_jobs (jobs = batch_job_ids )
104+ except Exception as err :
105+ _logger .error ("Zombie Check: failed to execute batch's describe_jobs. err = {0}" .format (err ))
106+ return []
107+
108+ self .handle_done_batch_jobs_with_not_complete_wf_instances (batch_jobs , jobs_2_actions , action_2_wf_instance )
37109
38110 # return an empty list since this is not associated with a particular trigger instance
39111 return []
40112
41113 def teardown_trigger (self , trigger , trigger_service ):
42114 pass
43115
44- def send_evaluation_message (self , workflow_json ):
45- self ._trigger_proxy .process_trigger (self ._trigger_type , workflow_json )
116+ def send_evaluation_message (self , workflow_msg ):
117+ self ._trigger_proxy .process_trigger (self ._trigger_type , workflow_msg )
118+
119+
0 commit comments