Source code for fermipy.jobs.link

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Module for class `Link`, which wraps a single command line application.
"""

from __future__ import absolute_import, division, print_function

import sys
import os
import copy
import argparse
import subprocess
import abc

from collections import OrderedDict

from fermipy.jobs.utils import is_null, is_not_null
from fermipy.jobs.file_archive import FileDict, FileStageManager
from fermipy.jobs.job_archive import get_timestamp, JobStatus, JobDetails
from fermipy.jobs.factory import LinkFactory
from fermipy.jobs.sys_interface import SysInterface


def extract_arguments(args, defaults):
    """Extract a set of arguments from a large dictionary

    Parameters
    ----------

    args : dict
        Dictionary with the arguments values to use

    defaults : dict
        Dictionary with all the argument to extract, and default values for each

    Returns
    -------

    out_dict : dict
        A dictionary with only the extracted arguments

    """
    out_dict = convert_option_dict_to_dict(defaults)
    for key in defaults.keys():
        mapped_val = args.get(key, None)
        if mapped_val is None:
            pass
        else:
            out_dict[key] = mapped_val
    return out_dict


def check_files(filelist,
                file_stage_manager=None,
                return_found=True,
                return_missing=True):
    """Check that all files in a list exist

    Parameters
    ----------

    filelist : list
        The list of files we are checking for.

    file_stage_manager : `fermipy.jobs.file_archive.FileStageManager`
        A object that maps files to scratch space if needed.

    return_found : list
        A list with the paths of the files that were found.

    return_missing : list
        A list with the paths of the files that were missing.

    Returns
    -------
    found : list
        List of the found files, if requested, otherwise `None`

    missing : list
        List of the missing files, if requested, otherwise `None`

    """
    found = []
    missing = []
    none_count = 0
    for fname in filelist:

        if fname is None:
            none_count += 1
            continue
        if fname[0] == '@':
            fname = fname[1:]
        if os.path.exists(fname):
            found.append(fname)
            continue
        if os.path.exists(fname + '.gz'):
            found.append(fname)
            continue
        if file_stage_manager is not None:
            fname = file_stage_manager.get_scratch_path(fname)
            if os.path.exists(fname):
                found.append(fname)
                continue
        missing.append(fname)
    if return_found and return_missing:
        return found, missing
    elif return_found:
        return found
    elif return_missing:
        return missing
    return None


def add_argument(parser, dest, info):
    """ Add an argument to an `argparse.ArgumentParser` object

    Parameters
    ----------
    parser : `argparse.ArgumentParser`
        The parser in question

    dest : str
        The destination for the argument

    info : `tuple`
        The information associated with the argument in question.

    """
    default, helpstr, typeinfo = info

    if dest == 'args':
        parser.add_argument('args', nargs='+', default=None, help=helpstr)
    elif typeinfo == list:
        parser.add_argument('--%s' % dest, action='append', help=helpstr)
    elif typeinfo == bool:
        parser.add_argument('--%s' % dest, action='store_true', help=helpstr)
    else:
        parser.add_argument('--%s' % dest, action='store', type=typeinfo,
                            default=default, help=helpstr)


def convert_value_to_option_tuple(value, helpstr=None):
    """Convert a value to a tuple of the form expected by `Link.options`

    Parameters
    ----------
    value :
        The value we are converting

    helpstr : str
        The help string that will be associated to this option.

    Returns
    -------
    option_info : tuple
        A 3-tuple with default value, helpstring and type for this particular option.

    """
    if helpstr is None:
        helpstr = "Unknown"
    return (value, helpstr, type(value))


def convert_dict_to_option_dict(input_dict):
    """Convert a simple key-value dictionary to a dictionary of options tuples"""
    ret_dict = {}
    for key, value in input_dict.items():
        ret_dict[key] = convert_value_to_option_tuple(value)
    return ret_dict


def convert_option_dict_to_dict(option_dict):
    """Convert a dictionary of options tuples to a simple key-value dictionary"""
    ret_dict = {}
    for key, value in option_dict.items():
        if is_null(value):
            ret_dict[key] = None
        elif isinstance(value, tuple):
            ret_dict[key] = value[0]
        else:
            ret_dict[key] = value
    return ret_dict

def reduce_by_keys(orig_dict, keys, default=None):
    """Reduce a dictionary by selecting a set of keys """
    ret = {}
    for key in keys:
        ret[key] = orig_dict.get(key, default)
    return ret