Skip to content

Commit f110bf5

Browse files
JonaOttojgoodson
andauthored
Wait_finished method for job API (regarding #240) (#242)
* Fix introduced typo in partition information dictionary key. (#241) * Added wait_finished method to job class (#240). * Added test method for wait_finished method of the job class. * Added _load_single_job method to the job class to extract the slurm_load_job functionality. * Updated find_id and wait_finished to use _load_single_job. Co-authored-by: Jonathan Goodson <[email protected]>
1 parent 467667e commit f110bf5

File tree

2 files changed

+145
-17
lines changed

2 files changed

+145
-17
lines changed

pyslurm/pyslurm.pyx

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ cdef class config:
598598
Ctl_dict['cpu_freq_govs'] = self.__Config_ptr.cpu_freq_govs
599599
Ctl_dict['cred_type'] = slurm.stringOrNone(self.__Config_ptr.cred_type, '')
600600
Ctl_dict['debug_flags'] = self.__Config_ptr.debug_flags
601-
Ctl_dict['def_mem_per_cp'] = self.__Config_ptr.def_mem_per_cpu
601+
Ctl_dict['def_mem_per_cpu'] = self.__Config_ptr.def_mem_per_cpu
602602
Ctl_dict['dependency_params'] = slurm.stringOrNone(self.__Config_ptr.dependency_params, '')
603603
Ctl_dict['eio_timeout'] = self.__Config_ptr.eio_timeout
604604
Ctl_dict['enforce_part_limits'] = bool(self.__Config_ptr.enforce_part_limits)
@@ -1023,16 +1023,16 @@ cdef class partition:
10231023

10241024
if record.def_mem_per_cpu & slurm.MEM_PER_CPU:
10251025
if record.def_mem_per_cpu == slurm.MEM_PER_CPU:
1026-
Part_dict['def_mem_per_cp'] = "UNLIMITED"
1026+
Part_dict['def_mem_per_cpu'] = "UNLIMITED"
10271027
Part_dict['def_mem_per_node'] = None
10281028
else:
1029-
Part_dict['def_mem_per_cp'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU)
1029+
Part_dict['def_mem_per_cpu'] = record.def_mem_per_cpu & (~slurm.MEM_PER_CPU)
10301030
Part_dict['def_mem_per_node'] = None
10311031
elif record.def_mem_per_cpu == 0:
1032-
Part_dict['def_mem_per_cp'] = None
1032+
Part_dict['def_mem_per_cpu'] = None
10331033
Part_dict['def_mem_per_node'] = "UNLIMITED"
10341034
else:
1035-
Part_dict['def_mem_per_cp'] = None
1035+
Part_dict['def_mem_per_cpu'] = None
10361036
Part_dict['def_mem_per_node'] = record.def_mem_per_cpu
10371037

10381038
if record.default_time == slurm.INFINITE:
@@ -1774,35 +1774,55 @@ cdef class job:
17741774

17751775
return retList
17761776

1777-
def find_id(self, jobid):
1778-
"""Retrieve job ID data.
1777+
cdef _load_single_job(self, jobid):
1778+
"""
1779+
Uses slurm_load_job to setup the self._job_ptr for a single job given by the jobid.
1780+
After calling this, the job pointer can be used in other methods
1781+
to operate on the informations of the job.
17791782
1780-
This method accepts both string and integer formats of the jobid. It
1781-
calls slurm_xlate_job_id() to convert the jobid appropriately.
1782-
This works for single jobs and job arrays.
1783+
This method accepts both string and integer formate of the jobid. It
1784+
calls slurm_xlate_job_id to convert the jobid appropriately.
17831785
1784-
:param str jobid: Job id key string to search
1785-
:returns: List of dictionary of values for given job id
1786-
:rtype: `list`
1786+
Raises an value error if the jobid does not correspond to a existing job.
1787+
1788+
:param str jobid: The jobid
1789+
:returns: void
1790+
:rtype: None.
17871791
"""
17881792
cdef:
17891793
int apiError
17901794
int rc
17911795

1796+
# jobid can be given as int or string
17921797
if isinstance(jobid, int) or isinstance(jobid, long):
17931798
jobid = str(jobid).encode("UTF-8")
17941799
else:
17951800
jobid = jobid.encode("UTF-8")
1796-
1801+
# convert jobid appropriately for slurm
17971802
jobid_xlate = slurm.slurm_xlate_job_id(jobid)
1803+
1804+
# load the job which sets the self._job_ptr pointer
17981805
rc = slurm.slurm_load_job(&self._job_ptr, jobid_xlate, self._ShowFlags)
17991806

1800-
if rc == slurm.SLURM_SUCCESS:
1801-
return list(self.get_job_ptr().values())
1802-
else:
1807+
if rc != slurm.SLURM_SUCCESS:
18031808
apiError = slurm.slurm_get_errno()
18041809
raise ValueError(slurm.stringOrNone(slurm.slurm_strerror(apiError), ''), apiError)
18051810

1811+
def find_id(self, jobid):
1812+
"""Retrieve job ID data.
1813+
1814+
This method accepts both string and integer formats of the jobid.
1815+
This works for single jobs and job arrays. It uses the internal
1816+
helper _load_single_job to do slurm_load_job. If the job corresponding
1817+
to the jobid does not exist, a ValueError will be raised.
1818+
1819+
:param str jobid: Job id key string to search
1820+
:returns: List of dictionary of values for given job id
1821+
:rtype: `list`
1822+
"""
1823+
self._load_single_job(jobid)
1824+
return list(self.get_job_ptr().values())
1825+
18061826
def find_user(self, user):
18071827
"""Retrieve a user's job data.
18081828
@@ -2879,6 +2899,38 @@ cdef class job:
28792899
#return "Submitted batch job %s" % job_id
28802900
return job_id
28812901

2902+
def wait_finished(self, jobid):
2903+
"""
2904+
Block until the job given by the jobid finishes.
2905+
This works for single jobs, as well as job arrays.
2906+
:param jobid: The job id of the slurm job.
2907+
To reference a job with job array set, use the first/"master" jobid
2908+
(the same as given by squeue)
2909+
:returns: The exit code of the slurm job.
2910+
:rtype: `int`
2911+
"""
2912+
exit_status = -9999
2913+
complete = False
2914+
while not complete:
2915+
complete = True
2916+
p_time.sleep(5)
2917+
self._load_single_job(jobid)
2918+
for i in range(0, self._job_ptr.record_count):
2919+
self._record = &self._job_ptr.job_array[i]
2920+
if IS_JOB_COMPLETED(self._job_ptr.job_array[i]):
2921+
exit_status_arrayjob = None
2922+
if WIFEXITED(self._record.exit_code):
2923+
exit_status_arrayjob = WEXITSTATUS(self._record.exit_code)
2924+
else:
2925+
exit_status_arrayjob = 1
2926+
# set exit code to the highest of all jobs in job array
2927+
exit_status = max([exit_status, exit_status_arrayjob])
2928+
else:
2929+
# go on with the next interation, unil all jobs in array are completed
2930+
complete = False
2931+
slurm.slurm_free_job_info_msg(self._job_ptr)
2932+
return exit_status
2933+
28822934

28832935
def slurm_pid2jobid(uint32_t JobPID=0):
28842936
"""Get the slurm job id from a process id.

tests/test_job.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,79 @@ def test_job_kill():
110110
# time.sleep(3)
111111
# test_job_search_after = pyslurm.job().find_id(test_job_id)[0]
112112
# assert_equals(test_job_search_after.get("job_state"), "FAILED")
113+
114+
115+
def test_job_wait_finished():
116+
"""Job: Test job().wait_finished()."""
117+
test_job = {
118+
"wrap": "sleep 30",
119+
"job_name": "pyslurm_test_job",
120+
"ntasks": 1,
121+
"cpus_per_task": 1,
122+
}
123+
test_job_id = pyslurm.job().submit_batch_job(test_job)
124+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
125+
126+
# wait for the job to finish
127+
exit_code = pyslurm.job().wait_finished(test_job_id)
128+
129+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
130+
assert start_job_state != "COMPLETED"
131+
assert end_job_state == "COMPLETED"
132+
assert exit_code == 0
133+
134+
# test again with another wrap
135+
test_job = {
136+
"wrap": "sleep 300; exit 1", # "exit 1" should yield failure ending
137+
"job_name": "pyslurm_test_job",
138+
"ntasks": 1,
139+
"cpus_per_task": 1,
140+
}
141+
test_job_id = pyslurm.job().submit_batch_job(test_job)
142+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
143+
144+
# wait for the job to finish
145+
exit_code = pyslurm.job().wait_finished(test_job_id)
146+
147+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
148+
assert start_job_state != "COMPLETED"
149+
assert end_job_state == "FAILED"
150+
assert exit_code == 1
151+
152+
153+
def test_job_wait_finished_w_arrays():
154+
"""Job: Test job().wait_finished() with job arrays."""
155+
test_job = {
156+
"wrap": "sleep 30; exit 0",
157+
"job_name": "pyslurm_array_test_job",
158+
"ntasks": 1,
159+
"cpus_per_task": 1,
160+
"array_inx": "0,1,2",
161+
}
162+
test_job_id = pyslurm.job().submit_batch_job(test_job)
163+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
164+
# wait for the job to finish
165+
exit_code = pyslurm.job().wait_finished(test_job_id)
166+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
167+
assert start_job_state != "COMPLETED"
168+
assert end_job_state == "COMPLETED"
169+
assert exit_code == 0
170+
171+
# test for exit codes: maximum exit code of all array jobs
172+
test_job = {
173+
# use array ID as exit code to yield different exit codes: 0, 1, 2
174+
"wrap": "sleep 30; exit $SLURM_ARRAY_TASK_ID",
175+
"job_name": "pyslurm_array_test_job",
176+
"ntasks": 1,
177+
"cpus_per_task": 1,
178+
"array_inx": "0,1,2",
179+
}
180+
test_job_id = pyslurm.job().submit_batch_job(test_job)
181+
start_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
182+
# wait for the job to finish
183+
exit_code = pyslurm.job().wait_finished(test_job_id)
184+
end_job_state = pyslurm.job().find_id(test_job_id)[0]["job_state"]
185+
assert start_job_state != "COMPLETED"
186+
# exit code 2 (the maximum of all) should yield FAILED for the entire job
187+
assert end_job_state == "FAILED"
188+
assert exit_code == 2

0 commit comments

Comments
 (0)