Skip to content

Commit 7080ef9

Browse files
authored
Merge pull request #3347 from oesteban/enh/interface-cleanup
RF: Clean-up the BaseInterface ``run()`` function using context
2 parents 72aac96 + 24f2cbc commit 7080ef9

File tree

3 files changed

+186
-130
lines changed

3 files changed

+186
-130
lines changed

nipype/interfaces/base/core.py

Lines changed: 51 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,31 @@
1-
# -*- coding: utf-8 -*-
21
# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
32
# vi: set ft=python sts=4 ts=4 sw=4 et:
43
"""
54
Nipype interfaces core
65
......................
76
8-
97
Defines the ``Interface`` API and the body of the
108
most basic interfaces.
119
The I/O specifications corresponding to these base
1210
interfaces are found in the ``specs`` module.
1311
1412
"""
15-
from copy import deepcopy
16-
from datetime import datetime as dt
1713
import os
18-
import platform
1914
import subprocess as sp
2015
import shlex
21-
import sys
2216
import simplejson as json
23-
from dateutil.parser import parse as parseutc
2417
from traits.trait_errors import TraitError
2518

2619
from ... import config, logging, LooseVersion
2720
from ...utils.provenance import write_provenance
28-
from ...utils.misc import str2bool, rgetcwd
29-
from ...utils.filemanip import split_filename, which, get_dependencies, canonicalize_env
21+
from ...utils.misc import str2bool
22+
from ...utils.filemanip import (
23+
canonicalize_env,
24+
get_dependencies,
25+
indirectory,
26+
split_filename,
27+
which,
28+
)
3029
from ...utils.subprocess import run_command
3130

3231
from ...external.due import due
@@ -39,7 +38,12 @@
3938
MpiCommandLineInputSpec,
4039
get_filecopy_info,
4140
)
42-
from .support import Bunch, InterfaceResult, NipypeInterfaceError, format_help
41+
from .support import (
42+
RuntimeContext,
43+
InterfaceResult,
44+
NipypeInterfaceError,
45+
format_help,
46+
)
4347

4448
iflogger = logging.getLogger("nipype.interface")
4549

@@ -63,8 +67,15 @@ class Interface(object):
6367
6468
"""
6569

66-
input_spec = None # A traited input specification
67-
output_spec = None # A traited output specification
70+
input_spec = None
71+
"""
72+
The specification of the input, defined by a :py:class:`~traits.has_traits.HasTraits` class.
73+
"""
74+
output_spec = None
75+
"""
76+
The specification of the output, defined by a :py:class:`~traits.has_traits.HasTraits` class.
77+
"""
78+
6879
_can_resume = False # See property below
6980
_always_run = False # See property below
7081

@@ -365,131 +376,44 @@ def run(self, cwd=None, ignore_exception=None, **inputs):
365376
if successful, results
366377
367378
"""
368-
from ...utils.profiler import ResourceMonitor
369-
370-
# if ignore_exception is not provided, taking self.ignore_exception
371-
if ignore_exception is None:
372-
ignore_exception = self.ignore_exception
373-
374-
# Tear-up: get current and prev directories
375-
syscwd = rgetcwd(error=False) # Recover when wd does not exist
376-
if cwd is None:
377-
cwd = syscwd
378-
379-
os.chdir(cwd) # Change to the interface wd
379+
rtc = RuntimeContext(
380+
resource_monitor=config.resource_monitor and self.resource_monitor,
381+
ignore_exception=ignore_exception
382+
if ignore_exception is not None
383+
else self.ignore_exception,
384+
)
380385

381-
enable_rm = config.resource_monitor and self.resource_monitor
382-
self.inputs.trait_set(**inputs)
386+
with indirectory(cwd or os.getcwd()):
387+
self.inputs.trait_set(**inputs)
383388
self._check_mandatory_inputs()
384389
self._check_version_requirements(self.inputs)
385-
interface = self.__class__
386-
self._duecredit_cite()
387390

388-
# initialize provenance tracking
389-
store_provenance = str2bool(
390-
config.get("execution", "write_provenance", "false")
391-
)
392-
env = deepcopy(dict(os.environ))
393-
if self._redirect_x:
394-
env["DISPLAY"] = config.get_display()
395-
396-
runtime = Bunch(
397-
cwd=cwd,
398-
prevcwd=syscwd,
399-
returncode=None,
400-
duration=None,
401-
environ=env,
402-
startTime=dt.isoformat(dt.utcnow()),
403-
endTime=None,
404-
platform=platform.platform(),
405-
hostname=platform.node(),
406-
version=self.version,
407-
)
408-
runtime_attrs = set(runtime.dictcopy())
409-
410-
mon_sp = None
411-
if enable_rm:
412-
mon_freq = float(config.get("execution", "resource_monitor_frequency", 1))
413-
proc_pid = os.getpid()
414-
iflogger.debug(
415-
"Creating a ResourceMonitor on a %s interface, PID=%d.",
416-
self.__class__.__name__,
417-
proc_pid,
418-
)
419-
mon_sp = ResourceMonitor(proc_pid, freq=mon_freq)
420-
mon_sp.start()
391+
with rtc(self, cwd=cwd, redirect_x=self._redirect_x) as runtime:
421392

422-
# Grab inputs now, as they should not change during execution
423-
inputs = self.inputs.get_traitsfree()
424-
outputs = None
425-
426-
try:
393+
# Grab inputs now, as they should not change during execution
394+
inputs = self.inputs.get_traitsfree()
395+
outputs = None
396+
# Run interface
427397
runtime = self._pre_run_hook(runtime)
428398
runtime = self._run_interface(runtime)
429399
runtime = self._post_run_hook(runtime)
400+
# Collect outputs
430401
outputs = self.aggregate_outputs(runtime)
431-
except Exception as e:
432-
import traceback
433-
434-
# Retrieve the maximum info fast
435-
runtime.traceback = traceback.format_exc()
436-
# Gather up the exception arguments and append nipype info.
437-
exc_args = e.args if getattr(e, "args") else tuple()
438-
exc_args += (
439-
"An exception of type %s occurred while running interface %s."
440-
% (type(e).__name__, self.__class__.__name__),
441-
)
442-
if config.get("logging", "interface_level", "info").lower() == "debug":
443-
exc_args += ("Inputs: %s" % str(self.inputs),)
444-
445-
runtime.traceback_args = ("\n".join(["%s" % arg for arg in exc_args]),)
446-
447-
if not ignore_exception:
448-
raise
449-
finally:
450-
if runtime is None or runtime_attrs - set(runtime.dictcopy()):
451-
raise RuntimeError(
452-
"{} interface failed to return valid "
453-
"runtime object".format(interface.__class__.__name__)
454-
)
455-
# This needs to be done always
456-
runtime.endTime = dt.isoformat(dt.utcnow())
457-
timediff = parseutc(runtime.endTime) - parseutc(runtime.startTime)
458-
runtime.duration = (
459-
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
460-
)
461-
results = InterfaceResult(
462-
interface, runtime, inputs=inputs, outputs=outputs, provenance=None
463-
)
464402

465-
# Add provenance (if required)
466-
if store_provenance:
467-
# Provenance will only throw a warning if something went wrong
468-
results.provenance = write_provenance(results)
469-
470-
# Make sure runtime profiler is shut down
471-
if enable_rm:
472-
import numpy as np
473-
474-
mon_sp.stop()
475-
476-
runtime.mem_peak_gb = None
477-
runtime.cpu_percent = None
478-
479-
# Read .prof file in and set runtime values
480-
vals = np.loadtxt(mon_sp.fname, delimiter=",")
481-
if vals.size:
482-
vals = np.atleast_2d(vals)
483-
runtime.mem_peak_gb = vals[:, 2].max() / 1024
484-
runtime.cpu_percent = vals[:, 1].max()
485-
486-
runtime.prof_dict = {
487-
"time": vals[:, 0].tolist(),
488-
"cpus": vals[:, 1].tolist(),
489-
"rss_GiB": (vals[:, 2] / 1024).tolist(),
490-
"vms_GiB": (vals[:, 3] / 1024).tolist(),
491-
}
492-
os.chdir(syscwd)
403+
results = InterfaceResult(
404+
self.__class__,
405+
rtc.runtime,
406+
inputs=inputs,
407+
outputs=outputs,
408+
provenance=None,
409+
)
410+
411+
# Add provenance (if required)
412+
if str2bool(config.get("execution", "write_provenance", "false")):
413+
# Provenance will only throw a warning if something went wrong
414+
results.provenance = write_provenance(results)
415+
416+
self._duecredit_cite()
493417

494418
return results
495419

nipype/interfaces/base/support.py

Lines changed: 96 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,113 @@
88
99
"""
1010
import os
11+
from contextlib import AbstractContextManager
1112
from copy import deepcopy
1213
from textwrap import wrap
1314
import re
15+
from datetime import datetime as dt
16+
from dateutil.parser import parse as parseutc
17+
import platform
1418

15-
from ... import logging
16-
from ...utils.misc import is_container
19+
from ... import logging, config
20+
from ...utils.misc import is_container, rgetcwd
1721
from ...utils.filemanip import md5, hash_infile
1822

1923
iflogger = logging.getLogger("nipype.interface")
2024

2125
HELP_LINEWIDTH = 70
2226

2327

28+
class RuntimeContext(AbstractContextManager):
29+
"""A context manager to run NiPype interfaces."""
30+
31+
__slots__ = ("_runtime", "_resmon", "_ignore_exc")
32+
33+
def __init__(self, resource_monitor=False, ignore_exception=False):
34+
"""Initialize the context manager object."""
35+
self._ignore_exc = ignore_exception
36+
_proc_pid = os.getpid()
37+
if resource_monitor:
38+
from ...utils.profiler import ResourceMonitor
39+
else:
40+
from ...utils.profiler import ResourceMonitorMock as ResourceMonitor
41+
42+
self._resmon = ResourceMonitor(
43+
_proc_pid,
44+
freq=float(config.get("execution", "resource_monitor_frequency", 1)),
45+
)
46+
47+
def __call__(self, interface, cwd=None, redirect_x=False):
48+
"""Generate a new runtime object."""
49+
# Tear-up: get current and prev directories
50+
_syscwd = rgetcwd(error=False) # Recover when wd does not exist
51+
if cwd is None:
52+
cwd = _syscwd
53+
54+
self._runtime = Bunch(
55+
cwd=str(cwd),
56+
duration=None,
57+
endTime=None,
58+
environ=deepcopy(dict(os.environ)),
59+
hostname=platform.node(),
60+
interface=interface.__class__.__name__,
61+
platform=platform.platform(),
62+
prevcwd=str(_syscwd),
63+
redirect_x=redirect_x,
64+
resmon=self._resmon.fname or "off",
65+
returncode=None,
66+
startTime=None,
67+
version=interface.version,
68+
)
69+
return self
70+
71+
def __enter__(self):
72+
"""Tear-up the execution of an interface."""
73+
if self._runtime.redirect_x:
74+
self._runtime.environ["DISPLAY"] = config.get_display()
75+
76+
self._runtime.startTime = dt.isoformat(dt.utcnow())
77+
self._resmon.start()
78+
# TODO: Perhaps clean-up path and ensure it exists?
79+
os.chdir(self._runtime.cwd)
80+
return self._runtime
81+
82+
def __exit__(self, exc_type, exc_value, exc_tb):
83+
"""Tear-down interface execution."""
84+
self._runtime.endTime = dt.isoformat(dt.utcnow())
85+
timediff = parseutc(self._runtime.endTime) - parseutc(self._runtime.startTime)
86+
self._runtime.duration = (
87+
timediff.days * 86400 + timediff.seconds + timediff.microseconds / 1e6
88+
)
89+
# Collect monitored data
90+
for k, v in self._resmon.stop():
91+
setattr(self._runtime, k, v)
92+
93+
os.chdir(self._runtime.prevcwd)
94+
95+
if exc_type is not None or exc_value is not None or exc_tb is not None:
96+
import traceback
97+
98+
# Retrieve the maximum info fast
99+
self._runtime.traceback = "".join(
100+
traceback.format_exception(exc_type, exc_value, exc_tb)
101+
)
102+
# Gather up the exception arguments and append nipype info.
103+
exc_args = exc_value.args if getattr(exc_value, "args") else tuple()
104+
exc_args += (
105+
f"An exception of type {exc_type.__name__} occurred while "
106+
f"running interface {self._runtime.interface}.",
107+
)
108+
self._runtime.traceback_args = ("\n".join([f"{arg}" for arg in exc_args]),)
109+
110+
if self._ignore_exc:
111+
return True
112+
113+
@property
114+
def runtime(self):
115+
return self._runtime
116+
117+
24118
class NipypeInterfaceError(Exception):
25119
"""Custom error for interfaces"""
26120

0 commit comments

Comments
 (0)