# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Implementation of `ScatterGather` class for dealing with LSF batch jobs
"""
from __future__ import absolute_import, division, print_function
import sys
import os
import time
from fermipy.jobs.job_archive import JobStatus
from fermipy.jobs.sys_interface import clean_job, SysInterface
[docs]class NativeInterface(SysInterface):
"""Implmentation of ScatterGather that uses the native system"""
string_exited = 'Exited with exit code'
string_successful = 'Successfully completed'
def __init__(self, **kwargs):
"""C'tor
Keyword arguements
------------------
jobs_per_cycle : int [20]
Maximum number of jobs to submit in each cycle
time_per_cycle : int [15]
Time per submission cycle in seconds
max_job_age : int [90]
Max job age in minutes
"""
super(NativeInterface, self).__init__(**kwargs)
self._time_per_cycle = kwargs.pop('time_per_cycle', 15)
self._jobs_per_cycle = kwargs.pop('jobs_per_cycle', 20)
[docs] def dispatch_job_hook(self, link, key, job_config, logfile, stream=sys.stdout):
"""Send a single job to be executed
Parameters
----------
link : `fermipy.jobs.chain.Link`
The link used to invoke the command we are running
key : str
A string that identifies this particular instance of the job
job_config : dict
A dictionrary with the arguments for the job. Used with
the self._command_template job template
logfile : str
The logfile for this job, may be used to check for success/ failure
"""
full_sub_dict = job_config.copy()
full_command = "%s >& %s" % (
link.command_template().format(**full_sub_dict), logfile)
logdir = os.path.dirname(logfile)
if self._dry_run:
sys.stdout.write("%s\n" % full_command)
else:
try:
os.makedirs(logdir)
except OSError:
pass
os.system(full_command)
[docs] def submit_jobs(self, link, job_dict=None, job_archive=None, stream=sys.stdout):
"""Submit all the jobs in job_dict """
if link is None:
return JobStatus.no_job
if job_dict is None:
job_keys = link.jobs.keys()
else:
job_keys = sorted(job_dict.keys())
# copy & reverse the keys b/c we will be popping item off the back of
# the list
unsubmitted_jobs = job_keys
unsubmitted_jobs.reverse()
failed = False
while unsubmitted_jobs:
njob_to_submit = min(self._jobs_per_cycle,
len(unsubmitted_jobs))
if self._dry_run:
njob_to_submit = len(unsubmitted_jobs)
for i in range(njob_to_submit):
job_key = unsubmitted_jobs.pop()
# job_details = job_dict[job_key]
job_details = link.jobs[job_key]
job_config = job_details.job_config
if job_details.status == JobStatus.failed:
clean_job(job_details.logfile, {}, self._dry_run)
# clean_job(job_details.logfile,
# job_details.outfiles, self.args['dry_run'])
job_config['logfile'] = job_details.logfile
new_job_details = self.dispatch_job(link, job_key, job_archive)
if new_job_details.status == JobStatus.failed:
failed = True
clean_job(new_job_details.logfile,
new_job_details.outfiles, self._dry_run)
link.jobs[job_key] = new_job_details
if unsubmitted_jobs:
print('Sleeping %.0f seconds between submission cycles' %
self._time_per_cycle)
time.sleep(self._time_per_cycle)
return failed
[docs]def get_native_default_args():
""" Get the correct set of batch jobs arguments.
"""
native_default_args = dict(max_jobs=500,
time_per_cycle=15,
jobs_per_cycle=20,
max_job_age=90,
no_batch=False)
return native_default_args.copy()