Source code for fermipy.jobs.link

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Module for class `Link`, which wraps a single command line application.
"""

from __future__ import absolute_import, division, print_function

import sys
import os
import copy
import argparse
import subprocess
import abc

from collections import OrderedDict

from fermipy.jobs.utils import is_null, is_not_null
from fermipy.jobs.file_archive import FileDict, FileStageManager
from fermipy.jobs.job_archive import get_timestamp, JobStatus, JobDetails
from fermipy.jobs.factory import LinkFactory
from fermipy.jobs.sys_interface import SysInterface


[docs]def extract_arguments(args, defaults): """Extract a set of arguments from a large dictionary Parameters ---------- args : dict Dictionary with the arguments values to use defaults : dict Dictionary with all the argument to extract, and default values for each Returns ------- out_dict : dict A dictionary with only the extracted arguments """ out_dict = convert_option_dict_to_dict(defaults) for key in defaults.keys(): mapped_val = args.get(key, None) if mapped_val is None: pass else: out_dict[key] = mapped_val return out_dict
[docs]def check_files(filelist, file_stage_manager=None, return_found=True, return_missing=True): """Check that all files in a list exist Parameters ---------- filelist : list The list of files we are checking for. file_stage_manager : `fermipy.jobs.file_archive.FileStageManager` A object that maps files to scratch space if needed. return_found : list A list with the paths of the files that were found. return_missing : list A list with the paths of the files that were missing. Returns ------- found : list List of the found files, if requested, otherwise `None` missing : list List of the missing files, if requested, otherwise `None` """ found = [] missing = [] none_count = 0 for fname in filelist: if fname is None: none_count += 1 continue if fname[0] == '@': fname = fname[1:] if os.path.exists(fname): found.append(fname) continue if os.path.exists(fname + '.gz'): found.append(fname) continue if file_stage_manager is not None: fname = file_stage_manager.get_scratch_path(fname) if os.path.exists(fname): found.append(fname) continue missing.append(fname) if return_found and return_missing: return found, missing elif return_found: return found elif return_missing: return missing return None
[docs]def add_argument(parser, dest, info): """ Add an argument to an `argparse.ArgumentParser` object Parameters ---------- parser : `argparse.ArgumentParser` The parser in question dest : str The destination for the argument info : `tuple` The information associated with the argument in question. """ default, helpstr, typeinfo = info if dest == 'args': parser.add_argument('args', nargs='+', default=None, help=helpstr) elif typeinfo == list: parser.add_argument('--%s' % dest, action='append', help=helpstr) elif typeinfo == bool: parser.add_argument('--%s' % dest, action='store_true', help=helpstr) else: parser.add_argument('--%s' % dest, action='store', type=typeinfo, default=default, help=helpstr)
[docs]def convert_value_to_option_tuple(value, helpstr=None): """Convert a value to a tuple of the form expected by `Link.options` Parameters ---------- value : The value we are converting helpstr : str The help string that will be associated to this option. Returns ------- option_info : tuple A 3-tuple with default value, helpstring and type for this particular option. """ if helpstr is None: helpstr = "Unknown" return (value, helpstr, type(value))
[docs]def convert_dict_to_option_dict(input_dict): """Convert a simple key-value dictionary to a dictionary of options tuples""" ret_dict = {} for key, value in input_dict.items(): ret_dict[key] = convert_value_to_option_tuple(value) return ret_dict
[docs]def convert_option_dict_to_dict(option_dict): """Convert a dictionary of options tuples to a simple key-value dictionary""" ret_dict = {} for key, value in option_dict.items(): if is_null(value): ret_dict[key] = None elif isinstance(value, tuple): ret_dict[key] = value[0] else: ret_dict[key] = value return ret_dict
[docs]def reduce_by_keys(orig_dict, keys, default=None): """Reduce a dictionary by selecting a set of keys """ ret = {} for key in keys: ret[key] = orig_dict.get(key, default) return ret