Source code for fermipy.jobs.chain

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Utilities to execute command line applications.

The main class is `Link`, which wraps a single command line application.

The `Chain` class inherits from `Link` and allow chaining together several
applications into a single object.
"""
from __future__ import absolute_import, division, print_function

import sys
import os
import copy

from collections import OrderedDict

from fermipy.jobs.file_archive import FileDict, FileStageManager
from fermipy.jobs.job_archive import get_timestamp, JobStatus, JobDetails


[docs]def extract_arguments(args, defaults, mapping): """ Extract a set of arguments from a large dictionary Parameters ---------- args : dict Dictionary with the arguments values to use defaults : dict Dictionary with all the argument to extract, and default values for each mapping : dict Dictionary mapping key in defaults to key in args This is useful: 1) when two applications use different names for what is effectively the same parameter 2) when you want to build a chain with multiple instances of the same application and pass different argument values to the different instances Returns dict filled with the arguments to pass to gtapp """ out_dict = convert_option_dict_to_dict(defaults) for key in defaults.keys(): if mapping is not None: try: mapped_key = mapping[key] mapped_val = args.get(mapped_key, None) if mapped_val is None: mapped_key = key except KeyError: mapped_key = key else: mapped_key = key mapped_val = args.get(mapped_key, None) if mapped_val is None: pass else: out_dict[key] = mapped_val return out_dict
[docs]def check_files(filelist, file_stage_manager=None, return_found=True, return_missing=True): """Check that all files in a list exist Return two lists: (found, missing) """ found = [] missing = [] none_count = 0 for fname in filelist: if fname is None: none_count += 1 continue if os.path.exists(fname): found.append(fname) continue if os.path.exists(fname + '.gz'): found.append(fname) continue if file_stage_manager is not None: fname = file_stage_manager.get_scratch_path(fname) if os.path.exists(fname): found.append(fname) continue missing.append(fname) if return_found and return_missing: return found, missing elif return_found: return found elif return_missing: return missing else: return None
[docs]def add_argument(parser, dest, info): """ Add an argument to an `argparse.ArgumentParser` object """ default, helpstr, typeinfo = info if typeinfo == list: parser.add_argument('%s' % dest, nargs='+', default=None, help=helpstr) elif typeinfo == bool: parser.add_argument('--%s' % dest, action='store_true', help=helpstr) else: parser.add_argument('--%s' % dest, action='store', type=typeinfo, default=default, help=helpstr)
[docs]def convert_value_to_option_tuple(value, helpstr=None): """Convert a value to a tuple of the form expected by `Link.options` Returns (value, helpstr, type(value) """ if helpstr is None: helpstr = "Unknown" return (value, helpstr, type(value))
[docs]def convert_dict_to_option_dict(input_dict): """Convert a simple key-value dictionary to a dictionary of options tuples""" ret_dict = {} for key, value in input_dict.items(): ret_dict[key] = convert_value_to_option_tuple(value) return ret_dict
[docs]def convert_option_dict_to_dict(option_dict): """Convert a dictionary of options tuples to a simple key-value dictionary""" ret_dict = {} for key, value in option_dict.items(): if value is None: ret_dict[key] = None elif isinstance(value, tuple): ret_dict[key] = value[0] else: ret_dict[key] = value return ret_dict
[docs]class Chain(Link): """ An object tying together a series of applications into a single application. This class keep track of the arguments to pass to the applications as well as input and output files. Note that this class is itself a `Link`. This allows you to write a python module that implements a chain and also has a __main__ function to allow it to be called from the shell. Parameters ---------- argmapper : function or None Function that maps input options (in self._options) to the format that is passed to the links in the chains. If None, then no mapping is applied. This is useful if you want to build a complicated set of options from a few inputs. """ def __init__(self, linkname, links, **kwargs): """ C'tor Parameters ---------- linkname : str Unique name of this particular link links : list A list of `Link` objects Keyword arguments ----------------- argmapper : function or None Function that maps input options (in self._options) to the format that is passed to the links in the chains. """ Link.__init__(self, linkname, **kwargs) self._argmapper = kwargs.get('argmapper', None) self.update_options(self.args.copy()) self._links = OrderedDict() for link in links: self._links[link.linkname] = link @property def links(self): """ Return the list of links """ return self._links @property def argmapper(self): """Return the arugment mapping function, if exits """ return self._argmapper
[docs] def __getitem__(self, key): """ Return the `Link` whose linkname is key""" return self._links[key]
def _latch_file_info(self): """Internal function to update the dictionaries keeping track of input and output files """ remapped = self.map_arguments(self.args) self.files.latch_file_info(remapped) self.sub_files.file_dict.clear() self.sub_files.update(self.files.file_dict) for link in self._links.values(): self.sub_files.update(link.files.file_dict) self.sub_files.update(link.sub_files.file_dict)
[docs] def print_summary(self, stream=sys.stdout, indent="", recurse_level=2): """Print a summary of the activity done by this `Chain`. Parameters ----------- stream : `file` Stream to print to indent : str Indentation at start of line recurse_level : int Number of recursion levels to print """ Link.print_summary(self, stream, indent, recurse_level) if recurse_level > 0: recurse_level -= 1 indent += " " for link in self._links.values(): stream.write("\n") link.print_summary(stream, indent, recurse_level)
[docs] def get_jobs(self, recursive=True): """Return a dictionary with all the jobs If recursive is True this will include jobs from internal `Link` """ if recursive: ret_dict = self.jobs.copy() for link in self._links.values(): ret_dict.update(link.get_jobs(recursive)) return ret_dict else: return self.jobs
[docs] def missing_input_files(self): """Return a dictionary of the missing input files and `Link` they are associated with """ ret_dict = OrderedDict() for link in self._links.values(): link_dict = link.missing_input_files() for key, value in link_dict.items(): try: ret_dict[key] += value except KeyError: ret_dict[key] = value return ret_dict
[docs] def missing_output_files(self): """Return a dictionary of the missing output files and `Link` they are associated with """ ret_dict = OrderedDict() for link in self._links.values(): link_dict = link.missing_output_files() for key, value in link_dict.items(): try: ret_dict[key] += value except KeyError: ret_dict[key] = value return ret_dict
[docs] def map_arguments(self, args): """Map arguments to options. This will use self._argmapper is it is defined. Parameters ------------- args : dict or `Namespace` If a namespace is given, it will be cast to dict Returns dict """ if self._argmapper is None: try: return args.__dict__ except AttributeError: return args else: mapped = self._argmapper(args) if mapped is None: try: return args.__dict__ except AttributeError: return args else: return mapped
[docs] def run_chain(self, stream=sys.stdout, dry_run=False, stage_files=True): """Run all the links in the chain Parameters ----------- stream : `file` Stream to print to dry_run : bool Print commands but do not run them stage_files : bool Stage files to and from the scratch area """ #ok = self.pre_run_checks(stream, dry_run) # if not ok: # return if self._file_stage is not None: input_file_mapping, output_file_mapping = self.map_scratch_files( self.sub_files) if stage_files: self._file_stage.make_scratch_dirs(input_file_mapping, dry_run) self._file_stage.make_scratch_dirs( output_file_mapping, dry_run) self.stage_input_files(input_file_mapping, dry_run) for link in self._links.values(): print ("Running link ", link.linkname) link.run_link(stream=stream, dry_run=dry_run, stage_files=False) if self._file_stage is not None and stage_files: self.stage_output_files(output_file_mapping, dry_run)
[docs] def run(self, stream=sys.stdout, dry_run=False): self.run_chain(stream, dry_run)
[docs] def update_args(self, override_args): """Update the argument used to invoke the application Note that this will also update the dictionary of input and output files. Parameters ----------- override_args : dict dictionary passed to the links """ self.args = extract_arguments(override_args, self.args, self.mapping) remapped = self.map_arguments(override_args) scratch_dir = self.args.get('scratch', None) if scratch_dir is not None: self._file_stage = FileStageManager(scratch_dir, '.') for link in self._links.values(): link.update_args(remapped) link.set_file_stage(self._file_stage) self._latch_file_info()