|
| 1 | +"""Parallel workflow execution via PBS/Torque |
| 2 | +""" |
| 3 | + |
| 4 | +import os |
| 5 | +import sys |
| 6 | + |
| 7 | +from .base import (GraphPluginBase, logger) |
| 8 | + |
| 9 | +from ...interfaces.base import CommandLine |
| 10 | + |
| 11 | + |
| 12 | +class SGEGraphPlugin(GraphPluginBase): |
| 13 | + """Execute using PBS/Torque |
| 14 | +
|
| 15 | + The plugin_args input to run can be used to control the SGE execution. |
| 16 | + Currently supported options are: |
| 17 | +
|
| 18 | + - template : template to use for batch job submission |
| 19 | + - qsub_args : arguments to be prepended to the job execution script in the |
| 20 | + qsub call |
| 21 | +
|
| 22 | + """ |
| 23 | + |
| 24 | + def __init__(self, **kwargs): |
| 25 | + self._template = """ |
| 26 | +#!/bin/bash |
| 27 | +#$ -V |
| 28 | +#$ -S /bin/bash |
| 29 | + """ |
| 30 | + self._qsub_args = '' |
| 31 | + if 'plugin_args' in kwargs: |
| 32 | + plugin_args = kwargs['plugin_args'] |
| 33 | + if 'template' in plugin_args: |
| 34 | + self._template = plugin_args['template'] |
| 35 | + if os.path.isfile(self._template): |
| 36 | + self._template = open(self._template).read() |
| 37 | + if 'qsub_args' in plugin_args: |
| 38 | + self._qsub_args = plugin_args['qsub_args'] |
| 39 | + super(SGEGraphPlugin, self).__init__(**kwargs) |
| 40 | + |
| 41 | + def _submit_graph(self, pyfiles, dependencies): |
| 42 | + batch_dir, _ = os.path.split(pyfiles[0]) |
| 43 | + submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh') |
| 44 | + with open(submitjobsfile, 'wt') as fp: |
| 45 | + fp.writelines('#!/usr/bin/env bash\n') |
| 46 | + for idx, pyscript in enumerate(pyfiles): |
| 47 | + batch_dir, name = os.path.split(pyscript) |
| 48 | + name = '.'.join(name.split('.')[:-1]) |
| 49 | + batchscript = '\n'.join((self._template, |
| 50 | + '%s %s' % (sys.executable, pyscript))) |
| 51 | + batchscriptfile = os.path.join(batch_dir, |
| 52 | + 'batchscript_%s.sh' % name) |
| 53 | + |
| 54 | + batchscriptoutfile = batchscriptfile + '.o' |
| 55 | + batchscripterrfile = batchscriptfile + '.e' |
| 56 | + |
| 57 | + with open(batchscriptfile, 'wt') as batchfp: |
| 58 | + batchfp.writelines(batchscript) |
| 59 | + batchfp.close() |
| 60 | + deps = '' |
| 61 | + if idx in dependencies: |
| 62 | + values = ' ' |
| 63 | + for jobid in dependencies[idx]: |
| 64 | + values += 'job%05d,' % jobid |
| 65 | + if 'job' in values: |
| 66 | + values = values.rstrip(',') |
| 67 | + deps = '-hold_jid%s' % values |
| 68 | + fp.writelines('job%05d=`qsub -o %s -e %s' + |
| 69 | + ' %s %s -N job%05d %s`\n' % ( |
| 70 | + idx, |
| 71 | + batchscriptoutfile, |
| 72 | + batchscripterrfile, |
| 73 | + self._qsub_args, deps, |
| 74 | + idx, |
| 75 | + batchscriptfile)) |
| 76 | + |
| 77 | + cmd = CommandLine('bash', environ=os.environ.data) |
| 78 | + cmd.inputs.args = '%s' % submitjobsfile |
| 79 | + cmd.run() |
| 80 | + logger.info('submitted all jobs to queue') |
0 commit comments