# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Abstract interface for interactions with system for launching jobs.
"""
from __future__ import absolute_import, division, print_function
import os
import sys
from fermipy.jobs.job_archive import JobStatus
[docs]def remove_file(filepath, dry_run=False):
"""Remove the file at filepath
Catches exception if the file does not exist.
If dry_run is True, print name of file to be removed, but do not remove it.
"""
if dry_run:
sys.stdout.write("rm %s\n" % filepath)
else:
try:
os.remove(filepath)
except OSError:
pass
[docs]def clean_job(logfile, outfiles, dry_run=False):
"""Removes log file and files created by failed jobs.
If dry_run is True, print name of files to be removed, but do not remove them.
"""
remove_file(logfile, dry_run)
for outfile in outfiles.values():
remove_file(outfile, dry_run)
[docs]def check_log(logfile, exited='Exited with exit code',
successful='Successfully completed'):
"""Check a log file to determine status of LSF job
Often logfile doesn't exist because the job hasn't begun
to run. It is unclear what you want to do in that case...
Parameters
----------
logfile : str
String with path to logfile
exited : str
Value to check for in existing logfile for exit with failure
successful : str
Value to check for in existing logfile for success
Returns str, one of 'Pending', 'Running', 'Done', 'Failed'
"""
if not os.path.exists(logfile):
return JobStatus.ready
if exited in open(logfile).read():
return JobStatus.failed
elif successful in open(logfile).read():
return JobStatus.done
return JobStatus.running
[docs]class SysInterface(object):
""" Base class to handle job dispatching interface """
string_exited = 'Exited with exit code'
string_successful = 'Successfully completed'
"""C'tor """
def __init__(self, **kwargs):
self._dry_run = kwargs.get('dry_run', False)
self._job_check_sleep = kwargs.get('job_check_sleep', None)
[docs] @classmethod
def check_job(cls, job_details):
""" Check the status of a specfic job """
return check_log(job_details.logfile, cls.string_exited, cls.string_successful)
[docs] def dispatch_job_hook(self, link, key, job_config, logfile, stream=sys.stdout):
"""Hook to dispatch a single job"""
raise NotImplementedError("SysInterface.dispatch_job_hook")
[docs] def dispatch_job(self, link, key, job_archive, stream=sys.stdout):
"""Function to dispatch a single job
Parameters
----------
link : `Link`
Link object that sendes the job
key : str
Key used to identify this particular job
job_archive : `JobArchive`
Archive used to keep track of jobs
Returns `JobDetails` object
"""
try:
job_details = link.jobs[key]
except KeyError:
print(key, link.jobs)
job_config = job_details.job_config
link.update_args(job_config)
logfile = job_config['logfile']
try:
self.dispatch_job_hook(link, key, job_config, logfile, stream)
job_details.status = JobStatus.running
except IOError:
job_details.status = JobStatus.failed
if job_archive is not None:
job_archive.register_job(job_details)
return job_details
[docs] def submit_jobs(self, link, job_dict=None, job_archive=None, stream=sys.stdout):
"""Run the `Link` with all of the items job_dict as input.
If job_dict is None, the job_dict will be take from link.jobs
Returns a `JobStatus` enum
"""
failed = False
if job_dict is None:
job_dict = link.jobs
for job_key, job_details in sorted(job_dict.items()):
job_config = job_details.job_config
# clean failed jobs
if job_details.status == JobStatus.failed:
clean_job(job_details.logfile,
job_details.outfiles, self._dry_run)
# clean_job(job_details.logfile, {}, self._dry_run)
job_config['logfile'] = job_details.logfile
new_job_details = self.dispatch_job(
link, job_key, job_archive, stream)
if new_job_details.status == JobStatus.failed:
failed = True
clean_job(new_job_details.logfile,
new_job_details.outfiles, self._dry_run)
link.jobs[job_key] = new_job_details
if failed:
return JobStatus.failed
return JobStatus.done
[docs] def clean_jobs(self, link, job_dict=None, clean_all=False):
""" Clean up all the jobs associated with this link.
Returns a `JobStatus` enum
"""
failed = False
if job_dict is None:
job_dict = link.jobs
for job_details in job_dict.values():
# clean failed jobs
if job_details.status == JobStatus.failed or clean_all:
# clean_job(job_details.logfile, job_details.outfiles, self._dry_run)
clean_job(job_details.logfile, {}, self._dry_run)
job_details.status = JobStatus.ready
if failed:
return JobStatus.failed
return JobStatus.done