From 671ea0532c9c46082a3fd21d54398f2b9d1940f7 Mon Sep 17 00:00:00 2001 From: oesteban Date: Fri, 1 Dec 2017 13:58:44 -0800 Subject: [PATCH 1/3] [ENH] Logging - MultiProc report current tasks When the verbosity of logs is >= INFO, a list of currently running tasks is generated and printed out. --- nipype/pipeline/plugins/multiproc.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index b26d029518..8201a2548f 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -12,10 +12,11 @@ from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys +from textwrap import indent +from logging import INFO from copy import deepcopy import numpy as np - from ... import logging from ...utils.profiler import get_system_total_memory_gb from ..engine import MapNode @@ -126,7 +127,7 @@ def __init__(self, plugin_args=None): self.raise_insufficient = self.plugin_args.get('raise_insufficient', True) # Instantiate different thread pools for non-daemon processes - logger.debug('MultiProcPlugin starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', + logger.debug('[MultiProc] Starting in "%sdaemon" mode (n_procs=%d, mem_gb=%0.2f)', 'non' * int(non_daemon), self.processors, self.memory_gb) NipypePool = NonDaemonPool if non_daemon else Pool @@ -158,7 +159,7 @@ def _submit_job(self, node, updatehash=False): run_node, (node, updatehash, self._taskid), callback=self._async_callback) - logger.debug('MultiProc submitted task %s (taskid=%d).', + logger.debug('[MultiProc] Submitted task %s (taskid=%d).', node.fullname, self._taskid) return self._taskid @@ -214,9 +215,17 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): stats = (len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, free_processors, self.processors) if self._stats != stats: - logger.info('Currently running %d tasks, and %d jobs ready. Free ' - 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d', - *stats) + tasks_list_msg = '' + if logger.level <= INFO: + running_tasks = [' * %s' % self.procs[jobid].fullname + for _, jobid in self.pending_tasks] + if running_tasks: + tasks_list_msg = '\nCurrently running:\n' + tasks_list_msg += '\n'.join(running_tasks) + tasks_list_msg = indent(tasks_list_msg, ' ' * 21) + logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free ' + 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s', + *stats, tasks_list_msg) self._stats = stats if free_memory_gb < 0.01 or free_processors == 0: From 89155916db9c8a81f9ca5579c4cb4dea82e34e82 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sat, 2 Dec 2017 07:53:04 -0800 Subject: [PATCH 2/3] fix logging in PY2 --- nipype/pipeline/plugins/multiproc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 8201a2548f..1eb773c0f6 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -216,6 +216,7 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): self.memory_gb, free_processors, self.processors) if self._stats != stats: tasks_list_msg = '' + if logger.level <= INFO: running_tasks = [' * %s' % self.procs[jobid].fullname for _, jobid in self.pending_tasks] @@ -225,7 +226,8 @@ def _send_procs_to_workers(self, updatehash=False, graph=None): tasks_list_msg = indent(tasks_list_msg, ' ' * 21) logger.info('[MultiProc] Running %d tasks, and %d jobs ready. Free ' 'memory (GB): %0.2f/%0.2f, Free processors: %d/%d.%s', - *stats, tasks_list_msg) + len(self.pending_tasks), len(jobids), free_memory_gb, self.memory_gb, + free_processors, self.processors, tasks_list_msg) self._stats = stats if free_memory_gb < 0.01 or free_processors == 0: From b366e918e656518aa87833171b3ce1a8cac52737 Mon Sep 17 00:00:00 2001 From: Oscar Esteban Date: Sat, 2 Dec 2017 18:12:54 -0800 Subject: [PATCH 3/3] add replacement for textwrap.indent --- nipype/pipeline/plugins/multiproc.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nipype/pipeline/plugins/multiproc.py b/nipype/pipeline/plugins/multiproc.py index 1eb773c0f6..194540116c 100644 --- a/nipype/pipeline/plugins/multiproc.py +++ b/nipype/pipeline/plugins/multiproc.py @@ -12,7 +12,6 @@ from multiprocessing import Process, Pool, cpu_count, pool from traceback import format_exception import sys -from textwrap import indent from logging import INFO from copy import deepcopy @@ -22,6 +21,16 @@ from ..engine import MapNode from .base import DistributedPluginBase +try: + from textwrap import indent +except ImportError: + def indent(text, prefix): + """ A textwrap.indent replacement for Python < 3.3 """ + if not prefix: + return text + splittext = text.splitlines(True) + return prefix + prefix.join(splittext) + # Init logger logger = logging.getLogger('workflow')