|
9 | 9 |
|
10 | 10 | import os
|
11 | 11 | import os.path as op
|
| 12 | +from pathlib import Path |
12 | 13 | import shutil
|
13 | 14 | import socket
|
14 | 15 | from copy import deepcopy
|
|
30 | 31 | load_json,
|
31 | 32 | emptydirs,
|
32 | 33 | savepkl,
|
33 |
| - indirectory, |
34 | 34 | silentrm,
|
35 | 35 | )
|
36 | 36 |
|
|
64 | 64 | logger = logging.getLogger("nipype.workflow")
|
65 | 65 |
|
66 | 66 |
|
| 67 | +class NodeExecutionError(RuntimeError): |
| 68 | + """A nipype-specific name for exceptions when executing a Node.""" |
| 69 | + |
| 70 | + |
67 | 71 | class Node(EngineBase):
|
68 | 72 | """
|
69 | 73 | Wraps interface objects for use in pipeline
|
@@ -98,7 +102,7 @@ def __init__(
|
98 | 102 | run_without_submitting=False,
|
99 | 103 | n_procs=None,
|
100 | 104 | mem_gb=0.20,
|
101 |
| - **kwargs |
| 105 | + **kwargs, |
102 | 106 | ):
|
103 | 107 | """
|
104 | 108 | Parameters
|
@@ -439,7 +443,8 @@ def run(self, updatehash=False):
|
439 | 443 | )
|
440 | 444 |
|
441 | 445 | # Check hash, check whether run should be enforced
|
442 |
| - logger.info('[Node] Setting-up "%s" in "%s".', self.fullname, outdir) |
| 446 | + if not isinstance(self, MapNode): |
| 447 | + logger.info(f'[Node] Setting-up "{self.fullname}" in "{outdir}".') |
443 | 448 | cached, updated = self.is_cached()
|
444 | 449 |
|
445 | 450 | # If the node is cached, check on pklz files and finish
|
@@ -530,7 +535,6 @@ def run(self, updatehash=False):
|
530 | 535 | # Tear-up after success
|
531 | 536 | shutil.move(hashfile_unfinished, hashfile_unfinished.replace("_unfinished", ""))
|
532 | 537 | write_node_report(self, result=result, is_mapnode=isinstance(self, MapNode))
|
533 |
| - logger.info('[Node] Finished "%s".', self.fullname) |
534 | 538 | return result
|
535 | 539 |
|
536 | 540 | def _get_hashval(self):
|
@@ -582,7 +586,7 @@ def _get_inputs(self):
|
582 | 586 | logger.critical("%s", e)
|
583 | 587 |
|
584 | 588 | if outputs is None:
|
585 |
| - raise RuntimeError( |
| 589 | + raise NodeExecutionError( |
586 | 590 | """\
|
587 | 591 | Error populating the inputs of node "%s": the results file of the source node \
|
588 | 592 | (%s) does not contain any outputs."""
|
@@ -697,79 +701,56 @@ def _run_command(self, execute, copyfiles=True):
|
697 | 701 | )
|
698 | 702 | return result
|
699 | 703 |
|
700 |
| - outdir = self.output_dir() |
701 |
| - # Run command: either execute is true or load_results failed. |
702 |
| - result = InterfaceResult( |
703 |
| - interface=self._interface.__class__, |
704 |
| - runtime=Bunch( |
705 |
| - cwd=outdir, |
706 |
| - returncode=1, |
707 |
| - environ=dict(os.environ), |
708 |
| - hostname=socket.gethostname(), |
709 |
| - ), |
710 |
| - inputs=self._interface.inputs.get_traitsfree(), |
711 |
| - ) |
712 |
| - |
| 704 | + outdir = Path(self.output_dir()) |
713 | 705 | if copyfiles:
|
714 | 706 | self._originputs = deepcopy(self._interface.inputs)
|
715 | 707 | self._copyfiles_to_wd(execute=execute)
|
716 | 708 |
|
717 |
| - message = '[Node] Running "{}" ("{}.{}")'.format( |
718 |
| - self.name, self._interface.__module__, self._interface.__class__.__name__ |
| 709 | + # Run command: either execute is true or load_results failed. |
| 710 | + logger.info( |
| 711 | + f'[Node] Executing "{self.name}" <{self._interface.__module__}' |
| 712 | + f".{self._interface.__class__.__name__}>" |
| 713 | + ) |
| 714 | + # Invoke core run method of the interface ignoring exceptions |
| 715 | + result = self._interface.run(cwd=outdir, ignore_exception=True) |
| 716 | + logger.info( |
| 717 | + f'[Node] Finished "{self.name}", elapsed time {result.runtime.duration}s.' |
719 | 718 | )
|
| 719 | + |
720 | 720 | if issubclass(self._interface.__class__, CommandLine):
|
721 |
| - try: |
722 |
| - with indirectory(outdir): |
723 |
| - cmd = self._interface.cmdline |
724 |
| - except Exception as msg: |
725 |
| - result.runtime.stderr = "{}\n\n{}".format( |
726 |
| - getattr(result.runtime, "stderr", ""), msg |
727 |
| - ) |
728 |
| - _save_resultfile( |
729 |
| - result, |
730 |
| - outdir, |
731 |
| - self.name, |
732 |
| - rebase=str2bool(self.config["execution"]["use_relative_paths"]), |
733 |
| - ) |
734 |
| - raise |
735 |
| - cmdfile = op.join(outdir, "command.txt") |
736 |
| - with open(cmdfile, "wt") as fd: |
737 |
| - print(cmd + "\n", file=fd) |
738 |
| - message += ", a CommandLine Interface with command:\n{}".format(cmd) |
739 |
| - logger.info(message) |
740 |
| - try: |
741 |
| - result = self._interface.run(cwd=outdir) |
742 |
| - except Exception as msg: |
743 |
| - result.runtime.stderr = "%s\n\n%s".format( |
744 |
| - getattr(result.runtime, "stderr", ""), msg |
745 |
| - ) |
746 |
| - _save_resultfile( |
747 |
| - result, |
| 721 | + # Write out command line as it happened |
| 722 | + Path.write_text(outdir / "command.txt", f"{result.runtime.cmdline}\n") |
| 723 | + |
| 724 | + exc_tb = getattr(result.runtime, "traceback", None) |
| 725 | + |
| 726 | + if not exc_tb: |
| 727 | + # Clean working directory if no errors |
| 728 | + dirs2keep = None |
| 729 | + if isinstance(self, MapNode): |
| 730 | + dirs2keep = [op.join(outdir, "mapflow")] |
| 731 | + |
| 732 | + result.outputs = clean_working_directory( |
| 733 | + result.outputs, |
748 | 734 | outdir,
|
749 |
| - self.name, |
750 |
| - rebase=str2bool(self.config["execution"]["use_relative_paths"]), |
| 735 | + self._interface.inputs, |
| 736 | + self.needed_outputs, |
| 737 | + self.config, |
| 738 | + dirs2keep=dirs2keep, |
751 | 739 | )
|
752 |
| - raise |
753 |
| - |
754 |
| - dirs2keep = None |
755 |
| - if isinstance(self, MapNode): |
756 |
| - dirs2keep = [op.join(outdir, "mapflow")] |
757 | 740 |
|
758 |
| - result.outputs = clean_working_directory( |
759 |
| - result.outputs, |
760 |
| - outdir, |
761 |
| - self._interface.inputs, |
762 |
| - self.needed_outputs, |
763 |
| - self.config, |
764 |
| - dirs2keep=dirs2keep, |
765 |
| - ) |
| 741 | + # Store results file under all circumstances |
766 | 742 | _save_resultfile(
|
767 | 743 | result,
|
768 | 744 | outdir,
|
769 | 745 | self.name,
|
770 | 746 | rebase=str2bool(self.config["execution"]["use_relative_paths"]),
|
771 | 747 | )
|
772 | 748 |
|
| 749 | + if exc_tb: |
| 750 | + raise NodeExecutionError( |
| 751 | + f"Exception raised while executing Node {self.name}.\n\n{result.runtime.traceback}" |
| 752 | + ) |
| 753 | + |
773 | 754 | return result
|
774 | 755 |
|
775 | 756 | def _copyfiles_to_wd(self, execute=True, linksonly=False):
|
@@ -1290,7 +1271,7 @@ def _collate_results(self, nodes):
|
1290 | 1271 | if code is not None:
|
1291 | 1272 | msg += ["Subnode %d failed" % i]
|
1292 | 1273 | msg += ["Error: %s" % str(code)]
|
1293 |
| - raise Exception( |
| 1274 | + raise NodeExecutionError( |
1294 | 1275 | "Subnodes of node: %s failed:\n%s" % (self.name, "\n".join(msg))
|
1295 | 1276 | )
|
1296 | 1277 |
|
|
0 commit comments