fermipy.jobs subpackage

fermipy.jobs.chain module

Utilities to execute command line applications.

The main class is Link, which wraps a single command line application.

The Chain class inherits from Link and allow chaining together several applications into a single object.

class fermipy.jobs.chain.Chain(linkname, links, **kwargs)[source]

Bases: fermipy.jobs.chain.Link

An object tying together a series of applications into a single application.

This class keep track of the arguments to pass to the applications as well as input and output files.

Note that this class is itself a Link. This allows you to write a python module that implements a chain and also has a __main__ function to allow it to be called from the shell.

Parameters:argmapper (function or None) – Function that maps input options (in self._options) to the format that is passed to the links in the chains. If None, then no mapping is applied. This is useful if you want to build a complicated set of options from a few inputs.
__getitem__(key)[source]

Return the Link whose linkname is key

Append link to this Chain

argmapper

Return the arugment mapping function, if exits

get_jobs(recursive=True)[source]

Return a dictionary with all the jobs

If recursive is True this will include jobs from internal Link

Return the list of links

map_arguments(args)[source]

Map arguments to options.

This will use self._argmapper is it is defined.

Parameters:
  • args (dict or Namespace) – If a namespace is given, it will be cast to dict
  • dict (Returns) –
missing_input_files()[source]

Return a dictionary of the missing input files and Link they are associated with

missing_output_files()[source]

Return a dictionary of the missing output files and Link they are associated with

print_summary(stream=<open file '<stdout>', mode 'w'>, indent='', recurse_level=2)[source]

Print a summary of the activity done by this Chain.

Parameters:
  • stream (file) – Stream to print to
  • indent (str) – Indentation at start of line
  • recurse_level (int) – Number of recursion levels to print
run(stream=<open file '<stdout>', mode 'w'>, dry_run=False)[source]
run_chain(stream=<open file '<stdout>', mode 'w'>, dry_run=False, stage_files=True)[source]

Run all the links in the chain

Parameters:
  • stream (file) – Stream to print to
  • dry_run (bool) – Print commands but do not run them
  • stage_files (bool) – Stage files to and from the scratch area
update_args(override_args)[source]

Update the argument used to invoke the application

Note that this will also update the dictionary of input and output files.

Parameters:override_args (dict) – dictionary passed to the links

Bases: object

A wrapper for a command line application.

This class keeps track for the arguments to pass to the application as well as input and output files.

This can be used either with other Link to build a chain, or as as standalone wrapper to pass conifguration to the application.

Parameters:
  • appname (str) – Name of the application
  • args (dict) – Up-to-date dictionary with the arguments that will be passed to the application
  • defaults (dict) – Dictionary with defaults values for the arguments
  • options (dict) – Dictionary with the options that we are allowed to set and default values
  • mapping (dict) – Dictionary remapping keys in options to arguments sent to the application This is useful when two ScienceTools use different names for what is effectively the same parameter
  • files (FileDict) – Object that keeps track of input and output files
  • jobs (OrderedDict) – Dictionary mapping keys to JobDetails
arg_names

Return the list of arg names

check_input_files(return_found=True, return_missing=True)[source]

Check if input files exist.

Return two lists: (found, missing)

check_output_files(return_found=True, return_missing=True)[source]

Check if output files exist.

Return two lists: (found, missing)

command_template()[source]

Build and return a string that can be used as a template invoking this chain from the command line.

The actual command can be obtainted by using self.command_template().format(**self.args)

create_job_details(key, job_config, logfile, status)[source]

Create a JobDetails for a single job

Parameters:
  • key (str) – Key used to identify this particular job
  • job_config (dict) – Dictionary with arguements passed to this particular job
  • logfile (str) – Name of the associated log file
  • status (int) – Current status of the job
  • JobDetails (Returns) –
fill_argparser(parser)[source]

Fill an argparser.ArgumentParser with the options from this chain

finalize(dry_run=False)[source]

Remove / compress files as requested

formatted_command()[source]

Build and return the formatted command for this Link.

This is exactly the command as called from the Unix command line.

get_failed_jobs()[source]

Return a dictionary with the subset of jobs that are marked as failed

get_jobs(recursive=True)[source]

Return a dictionary with all the jobs

If recursive is True this will include jobs from internal Link

make_argv()[source]

Generate the vector of arguments for this Link.

This is exactly the ‘argv’ generated for the command as called from the Unix command line.

map_scratch_files(file_dict)[source]

Build and return the mapping for copying files to and from scratch area

missing_input_files()[source]

Make and return a dictionary of the missing input files.

This returns a dictionary mapping filepath to list of links that use the file as input.

missing_output_files()[source]

Make and return a dictionary of the missing output files.

This returns a dictionary mapping filepath to list of links that product the file as output.

pre_run_checks(stream=<open file '<stdout>', mode 'w'>, dry_run=False)[source]

Do some checks before running this link

This checks if input and output files are present.

If input files are missing this will raise OSError if dry_run is False If all output files are present this return False.

Parameters:
  • stream (file) – Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it
  • bool (Returns) – True if it is ok to proceed with running the link
print_summary(stream=<open file '<stdout>', mode 'w'>, indent='', recurse_level=2)[source]

Print a summary of the activity done by this Link.

Parameters:
  • stream (file) – Stream to print to
  • indent (str) – Indentation at start of line
  • recurse_level (int) – Number of recursion levels to print
register_job(key, job_config, logfile, status)[source]

Create a JobDetails for this link and add it to the self.jobs dictionary.

Parameters:
  • key (str) – Key used to identify this particular job
  • job_config (dict) – Dictionary with arguments passed to this particular job
  • logfile (str) – Name of the associated log file
  • status (int) – Current status of the job
  • JobDetails (Returns) –
run(stream=<open file '<stdout>', mode 'w'>, dry_run=False)[source]

Runs this link.

This version is intended to be overwritten by sub-classes so as to provide a single function that behaves the same for all version of Link

Parameters:
  • stream (file) – Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it
run_argparser(argv)[source]

Initialize a link with a set of arguments using an argparser.ArgumentParser

run_command(stream=<open file '<stdout>', mode 'w'>, dry_run=False)[source]

Runs the command for this link. This method can be overridden by sub-classes to invoke a different command

Parameters:
  • stream (file) – Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it

Runs this link.

This checks if input and output files are present.

If input files are missing this will raise OSError if dry_run is False If all output files are present this will skip execution.

Parameters:
  • stream (file) – Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it
  • stage_files (bool) – Stage files to and from the scratch area
set_file_stage(file_stage)[source]

Set this link to use a FileStageManager to copy files to and from a scratch area

stage_input_files(file_mapping, dry_run=True)[source]

Stage the input files to the scratch area and adjust the arguments accordingly

stage_output_files(file_mapping, dry_run=True)[source]

Stage the input files to the scratch area and adjust the arguments accordingly

update_args(override_args)[source]

Update the argument used to invoke the application

Note that this will also update the dictionary of input and output files.

Parameters:override_args (dict) – Dictionary of arguments to override the current values
update_file_args(file_mapping)[source]

Adjust the arguments to deal with staging files to the scratch area

update_options(input_dict)[source]

Update the values in self.options

Parameters:
  • input_dict (dict) – Dictionary with argument key : value pairings
  • values into self._options (Inserts) –
update_sub_file_dict(sub_files)[source]

Update a file dict with information from self

fermipy.jobs.chain.add_argument(parser, dest, info)[source]

Add an argument to an argparse.ArgumentParser object

fermipy.jobs.chain.check_files(filelist, file_stage_manager=None, return_found=True, return_missing=True)[source]

Check that all files in a list exist

Return two lists: (found, missing)

fermipy.jobs.chain.convert_dict_to_option_dict(input_dict)[source]

Convert a simple key-value dictionary to a dictionary of options tuples

fermipy.jobs.chain.convert_option_dict_to_dict(option_dict)[source]

Convert a dictionary of options tuples to a simple key-value dictionary

fermipy.jobs.chain.convert_value_to_option_tuple(value, helpstr=None)[source]

Convert a value to a tuple of the form expected by Link.options

Returns (value, helpstr, type(value)

fermipy.jobs.chain.extract_arguments(args, defaults, mapping)[source]

Extract a set of arguments from a large dictionary

Parameters:
  • args (dict) – Dictionary with the arguments values to use
  • defaults (dict) – Dictionary with all the argument to extract, and default values for each
  • mapping (dict) – Dictionary mapping key in defaults to key in args This is useful: 1) when two applications use different names for what is effectively the same parameter 2) when you want to build a chain with multiple instances of the same application and pass different argument values to the different instances
  • dict filled with the arguments to pass to gtapp (Returns) –

fermipy.jobs.file_archive module

Classes and utilites to keep track of files associated to an analysis.

The main class is FileArchive, which keep track of all the files associated to an analysis.

The FileHandle helper class encapsulates information on a particular file.

class fermipy.jobs.file_archive.FileArchive(**kwargs)[source]

Bases: object

Class that keeps track of the status of files used in an analysis

Parameters:
__getitem__(key)[source]

Return the FileHandle whose linkname is key

base_path

Return the base file path for all files in this FileArchive

static build_archive(**kwargs)[source]

Return the singleton FileArchive instance, building it if needed

cache

Return the transiet representation of this FileArchive

static get_archive()[source]

Return the singleton FileArchive instance

get_file_ids(file_list, creator=None, status=0, file_dict=None)[source]

Get or create a list of file ids based on file names

Parameters:
  • file_list (list) – The paths to the file
  • creatror (int) – A unique key for the job that created these files
  • status (FileStatus) – Enumeration giving current status of files
  • file_dict (FileDict) – Mask giving flags set on this file
  • list of integers (Returns) –
get_file_paths(id_list)[source]

Get a list of file paths based of a set of ids

Parameters:
  • id_list (list) – List of integer file keys
  • list of file paths (Returns) –
get_handle(filepath)[source]

Get the FileHandle object associated to a particular file

register_file(filepath, creator, status=0, flags=0)[source]

Register a file in the archive.

If the file already exists, this raises a KeyError

Parameters:
  • filepath (str) – The path to the file
  • creatror (int) – A unique key for the job that created this file
  • status (FileStatus) – Enumeration giving current status of file
  • flags (FileFlags) – Enumeration giving flags set on this file
  • FileHandle (Returns) –
table

Return the persistent representation of this FileArchive

table_file

Return the path to the file used to persist this FileArchive

update_file(filepath, creator, status)[source]

Update a file in the archive

If the file does not exists, this raises a KeyError

Parameters:
  • filepath (str) – The path to the file
  • creatror (int) – A unique key for the job that created this file
  • status (FileStatus) – Enumeration giving current status of file
  • FileHandle (Returns) –
update_file_status()[source]

Update the status of all the files in the archive

write_table_file(table_file=None)[source]

Write the table to self._table_file

class fermipy.jobs.file_archive.FileDict(**kwargs)[source]

Bases: object

Small class to keep track of files used & createed by a link.

Parameters:
  • file_args (dict) – Dictionary mapping argument [str] to FileFlags enum
  • file_dict (dict) – Dictionary mapping file path [str] to FileFlags enum
chain_input_files

Return a list of the input files needed by this chain.

For Link sub-classes this will return only those files that were not created by any internal Link

chain_output_files

Return a list of the all the output files produced by this link.

For Link sub-classes this will return only those files that were not marked as internal files or marked for removal.

gzip_files

Return a list of the files compressed by this link.

This returns all files that were explicitly marked for compression.

input_files

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

input_files_to_stage

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

internal_files

Return a list of the intermediate files produced by this link.

This returns all files that were explicitly marked as internal files.

items()[source]

Return iterator over self.file_dict

latch_file_info(args)[source]

Extract the file paths from a set of arguments

output_files

Return a list of the output files produced by this link.

For Link sub-classes this will return the union of all the output files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

output_files_to_stage

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

print_chain_summary(stream=<open file '<stdout>', mode 'w'>, indent='')[source]

Print a summary of the files in this file dict.

This version uses chain_input_files and chain_output_files to count the input and output files.

print_summary(stream=<open file '<stdout>', mode 'w'>, indent='')[source]

Print a summary of the files in this file dict.

This version explictly counts the union of all input and output files.

temp_files

Return a list of the temporary files produced by this link.

This returns all files that were explicitly marked for removal.

update(file_dict)[source]

Update self with values from a dictionary mapping file path [str] to FileFlags enum

class fermipy.jobs.file_archive.FileFlags[source]

Bases: object

Bit masks to indicate file types

gz_mask = 8
in_ch_mask = 23
in_stage_mask = 33
input_mask = 1
internal_mask = 16
no_flags = 0
out_ch_mask = 22
out_stage_mask = 34
output_mask = 2
rm_mask = 4
rmint_mask = 20
stageable = 32
class fermipy.jobs.file_archive.FileHandle(**kwargs)[source]

Bases: object

Class to keep track of infomration about a file file.

Parameters:
  • key (int) – Unique id for this particular file
  • creator (int) – Unique id for the job that created this file
  • timestamp (int) – File creation time cast as an int
  • status (FileStatus) – Enum giving current status of file
  • flags (FileFlags) – Mask giving flags set on this file
  • path (str) – Path to file
append_to_table(table)[source]

Add this instance as a row on a astropy.table.Table

check_status(basepath=None)[source]

Check on the status of this particular file

static create_from_row(table_row)[source]

Build and return a FileHandle from an astropy.table.row.Row

static make_dict(table)[source]

Build and return a dict of FileHandle from an astropy.table.Table

The dictionary is keyed by FileHandle.key, which is a unique integer for each file

static make_table(file_dict)[source]

Build and return an astropy.table.Table to store FileHandle

update_table_row(table, row_idx)[source]

Update the values in an astropy.table.Table for this instances

class fermipy.jobs.file_archive.FileStageManager(scratchdir, workdir)[source]

Bases: object

Small class to deal with staging files to and from a scratch area

construct_scratch_path(dirname, basename)[source]

Construct and return a path in the scratch area.

This will be <self.scratchdir>/<dirname>/<basename>

static copy_from_scratch(file_mapping, dry_run=True)[source]

Copy output files from scratch area

static copy_to_scratch(file_mapping, dry_run=True)[source]

Copy input files to scratch area

get_scratch_path(local_file)[source]

Construct and return a path in the scratch area from a local file.

static make_scratch_dirs(file_mapping, dry_run=True)[source]

Make any directories need in the scratch area

map_files(local_files)[source]

Build a dictionary mapping local paths to scratch paths.

Parameters:
  • local_files (list) – List of filenames to be mapped to scratch area
  • dict (Returns) – Mapping local_file : fullpath of scratch file
split_local_path(local_file)[source]

Split the local path into a directory name and a file name

If local_file is in self.workdir or a subdirectory of it, the directory will consist of the relative path from workdir.

If local_file is not in self.workdir, directory will be empty.

Returns (dirname, basename)

class fermipy.jobs.file_archive.FileStatus[source]

Bases: object

Enumeration of file status types

exists = 2
expected = 1
missing = 3
no_file = 0
superseded = 4
temp_removed = 5
fermipy.jobs.file_archive.get_timestamp()[source]

Get the current time as an integer

fermipy.jobs.file_archive.get_unique_match(table, colname, value)[source]

Get the row matching value for a particular column. If exactly one row matchs, return index of that row, Otherwise raise KeyError.

fermipy.jobs.file_archive.main_browse()[source]

Entry point for command line use for browsing a FileArchive

fermipy.jobs.job_archive module

Classes and utilites to keep track the various jobs that comprise an analysis pipeline.

The main class is JobArchive, which keep track of all the jobs associated to an analysis.

The JobDetails helper class encapsulates information on a instance of running a job.

class fermipy.jobs.job_archive.JobArchive(**kwargs)[source]

Bases: object

Class that keeps of all the jobs associated to an analysis.

Parameters:
  • table_file (str) – Path to the file used to persist this JobArchive
  • table (astropy.table.Table) – Persistent representation of this JobArchive
  • table_ids (astropy.table.Table) – Ancillary table with information about file ids
  • file_archive (FileArchive) – Archive with infomation about all this files used and produced by this analysis
__getitem__(fullkey)[source]

Return the JobDetails matching fullkey

static build_archive(**kwargs)[source]

Return the singleton JobArchive instance, building it if needed

static build_temp_job_archive()[source]

Build and return a JobArchive using defualt locations of persistent files.

cache

Return the transiet representation of this JobArchive

file_archive

Return the FileArchive with infomation about all the files used and produced by this analysis

static get_archive()[source]

Return the singleton JobArchive instance

get_details(jobname, jobkey)[source]

Get the JobDetails associated to a particular job instance

make_job_details(row_idx)[source]

Create a JobDetails from an astropy.table.row.Row

register_job(job_details)[source]

Register a job in this JobArchive

Register a job in the JobArchive from a Link object

register_jobs(job_dict)[source]

Register a bunch of jobs in this archive

table

Return the persistent representation of this JobArchive

table_file

Return the path to the file used to persist this JobArchive

table_ids

Return the rpersistent epresentation of the ancillary info of this JobArchive

update_job(job_details)[source]

Update a job in the JobArchive

update_job_status(checker_func)[source]

Update the status of all the jobs in the archive

write_table_file(job_table_file=None, file_table_file=None)[source]

Write the table to self._table_file

class fermipy.jobs.job_archive.JobDetails(**kwargs)[source]

Bases: object

A simple structure to keep track of the details of each of the sub-proccess jobs.

Parameters:
  • dbkey (int) – A unique key to identify this job
  • jobname (str) – A name used to idenfity this job
  • jobkey (str) – A string to identify this instance of the job
  • appname (str) – The executable inovked to run the job
  • logfile (str) – The logfile for this job, may be used to check for success/ failure
  • job_config (dict) – A dictionrary with the arguments for the job
  • parent_id (int) – Unique key identifying the parent job
  • infile_ids (list of int) – Keys to identify input files to this job
  • outfile_ids (list of int) – Keys to identify output files from this job
  • rmfile_ids (list of int) – Keys to identify temporary files removed by this job
  • intfile_ids (list of int) – Keys to identify internal files
  • status (int) – Current job status, one of the enums above
append_to_tables(table, table_ids)[source]

Add this instance as a row on a astropy.table.Table

check_status_logfile(checker_func)[source]

Check on the status of this particular job using the logfile

static create_from_row(table_row, table_id_array)[source]

Create a JobDetails from an astropy.table.row.Row

fullkey

Return the fullkey for this job fullkey = <jobkey>@<jobname>

get_file_ids(file_archive, creator=None, status=0)[source]

Fill the file id arrays from the file lists

Parameters:
  • file_archive (FileArchive) – Used to look up file ids
  • creator (int) – A unique key for the job that created these file
  • status (FileStatus) – Enumeration giving current status thse files
get_file_paths(file_archive, file_id_array)[source]

Get the full paths of the files used by this object from the the id arrays

static make_dict(table, table_ids)[source]

Build a dictionary map int to JobDetails from an astropy.table.Table

static make_fullkey(jobkey, jobname)[source]

Combine jobname and jobkey to make a unique key fullkey = <jobkey>@<jobname>

static make_tables(job_dict)[source]

Build and return an astropy.table.Table' to store `JobDetails

static split_fullkey(fullkey)[source]

Split fullkey to make extract jobname, jobkey fullkey = <jobkey>@<jobname>

update_table_row(table, row_idx)[source]

Add this instance as a row on a astropy.table.Table

class fermipy.jobs.job_archive.JobStatus[source]

Bases: object

Enumeration of job status types

done = 3
failed = 4
no_job = 0
pending = 1
running = 2
fermipy.jobs.job_archive.get_matches(table, colname, value)[source]

Get the rows matching a value for a particular column.

fermipy.jobs.job_archive.main_browse()[source]

Entry point for command line use for browsing a JobArchive

fermipy.jobs.lsf_impl module

Implementation of ScatterGather class for dealing with LSF batch jobs

class fermipy.jobs.lsf_impl.LsfScatterGather(**kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Implmentation of ScatterGather that uses LSF

check_job(job_details)[source]

Check the status of a single job

Returns str, one of ‘Pending’, ‘Running’, ‘Done’, ‘Failed’

default_options = {'jobs_per_cycle': (20, 'Maximum number of jobs to submit in each cycle.', <type 'int'>), 'time_per_cycle': (15.0, 'Time per submission cycle in seconds.', <type 'float'>), 'dry_run': (False, 'Print commands, but do not execute them', <type 'bool'>), 'max_jobs': (500, 'Limit on the number of running or queued jobs.', <type 'int'>), 'action': ('run', 'Action to perform', <type 'str'>), 'max_job_age': (90.0, 'Max job age in minutes.', <type 'float'>), 'check_status_once': (False, 'Check status only once before proceeding', <type 'bool'>), 'job_check_sleep': (300, 'Sleep time between checking on job status (s)', <type 'int'>), 'force_gather': (False, 'Force gathering stage', <type 'bool'>), 'print_update': (False, 'Print summary of job status', <type 'bool'>)}
dispatch_job_hook(link, key, job_config, logfile)[source]

Send a single job to the LSF batch

Parameters:
  • link (fermipy.jobs.chain.Link) – The link used to invoke the command we are running
  • key (str) – A string that identifies this particular instance of the job
  • job_config (dict) – A dictionrary with the arguments for the job. Used with the self._command_template job template
  • logfile (str) – The logfile for this job, may be used to check for success/ failure
submit_jobs(link, job_dict=None)[source]

Submit all the jobs in job_dict

fermipy.jobs.lsf_impl.build_bsub_command(command_template, lsf_args)[source]

Build and return a lsf batch command template

The structure will be ‘bsub -s <key> <value> <command_template>’
where <key> and <value> refer to items in lsf_args

Build a ScatterGather that will run multiple instance of a single link

fermipy.jobs.lsf_impl.check_log(logfile, exited='Exited with exit code', successful='Successfully completed')[source]

Check a log file to determine status of LSF job

Often logfile doesn’t exist because the job hasn’t begun to run. It is unclear what you want to do in that case...

Parameters:
  • logfile (str) – String with path to logfile
  • exited (str) – Value to check for in existing logfile for exit with failure
  • successful (str) – Value to check for in existing logfile for success
  • str, one of 'Pending', 'Running', 'Done', 'Failed' (Returns) –
fermipy.jobs.lsf_impl.get_lsf_status()[source]

Count and print the number of jobs in various LSF states

fermipy.jobs.scatter_gather module

Abstract interface for parallel execution of multiple jobs.

The main class is ScatterGather, which can submit many instances of a job with different configurations.

The ConfigMaker abstract helper class is used to generate configurations.

class fermipy.jobs.scatter_gather.ConfigMaker(link, **kwargs)[source]

Bases: object

Abstract helper class to build configuration dictionaries for parallel jobs.

Sub-classes will need to:

Define options by passing a dictionary of option tuples to the c’tor. This will take a form something like:

options=dict(string_opt=(“default”, “Some string”, str),
float_opt=(3.0, “Some float”, float), list_opt=(None, “Some list”, list))
add_arguments(parser, action)[source]

Hook to add arguments to an argparse.ArgumentParser

add_options(option_dict)[source]

Add options into an option dictionary

build_job_configs(args)[source]

Hook to build job configurations

Sub-class implementation should return three dictionaries:

input_config : dict
Configuration options passed to initializers
job_configs : dict
Dictionary of dictionaries passed to parallel jobs
output_config : dict
Configuration options passed to gatherer
make_base_config(args)[source]

Hook to build a baseline job configuration

Parameters:args (dict) – Command line arguments, see add_arguments
class fermipy.jobs.scatter_gather.ScatterGather(**kwargs)[source]

Bases: fermipy.jobs.chain.Link

Abstract base class to dispatch several jobs in parallel and collect and merge the results.

__call__(argv)[source]

Parses command line arguments and runs the requested action

Parameters:
  • argv (list-like) – List of command line arguments
  • str with status (Returns) –
base_config

Return the baseline job configuration

build_configs(args)[source]

Build the configuration objects.

This invokes the ConfigMaker to build the configurations

build_job_dict()[source]

Build a dictionary of JobDetails objects for the internal Link

check_job(job_details)[source]

Check the status of a specfic job

check_status()[source]

Check on the status of all the jobs in job dict.

Returns:
  • running (bool) – True if jobs are still running
  • failed (bool) – True if any jobs have failed
config_maker

Return the object used to translate arguments

default_gather_logfile = 'gather.log'
default_init_logfile = 'init.log'
default_options = {'dry_run': (False, 'Print commands, but do not execute them', <type 'bool'>), 'action': ('run', 'Action to perform', <type 'str'>), 'check_status_once': (False, 'Check status only once before proceeding', <type 'bool'>), 'job_check_sleep': (300, 'Sleep time between checking on job status (s)', <type 'int'>), 'force_gather': (False, 'Force gathering stage', <type 'bool'>), 'print_update': (False, 'Print summary of job status', <type 'bool'>)}
default_prefix_logfile = 'scatter'
dispatch_job(link, key)[source]

Function to dispatch a single job

Parameters:
  • link (Link) – Link object that sendes the job
  • key (str) – Key used to identify this particular job
  • JobDetails object (Returns) –
dispatch_job_hook(link, key, job_config, logfile)[source]

Hook to dispatch a single job

Return the Link object used the scatter phase of processing

gather_results()[source]

Run the gather Link using self._output_config as input

Returns a JobStatus enum

get_jobs(recursive=True)[source]

Return a dictionary with all the jobs

If recursive is True this will include jobs from all internal Link

initialize()[source]

Run the initialization Link using self._input_config as input.

Returns a JobStatus enum

Return the Link object used to initial the processing

invoke(argv)[source]

Invoke this object to preform a particular action

Parameters:
  • argv (list-like) – List of command line arguments, passed to helper classes
  • str with status (Returns) –
job_archive

Return the JobArchive object

no_batch

Return the value of the no_batch flag

print_failed(stream=<open file '<stderr>', mode 'w'>)[source]

Print list of the failed jobs

print_summary(stream=<open file '<stdout>', mode 'w'>, indent='', recurse_level=2)[source]

Print a summary of the activity done by this Link.

Parameters:
  • stream (file) – Stream to print to
  • indent (str) – Indentation at start of line
  • recurse_level (int) – Number of recursion levels to print
print_update(stream=<open file '<stdout>', mode 'w'>)[source]

Print an update about the current number of jobs running

resubmit()[source]

Function to resubmit failed jobs and collect results

run(stream=<open file '<stdout>', mode 'w'>, dry_run=False)[source]
run_argparser(argv)[source]

Initialize self with a set of arguments

run_jobs()[source]

Function to dipatch jobs and collect results

Return the Link object used the scatter phase of processing

submit_jobs(link, job_dict=None)[source]

Run the Link with all of the items job_dict as input.

If job_dict is None, the job_dict will be take from link.jobs

Returns a JobStatus enum

update_args(override_args)[source]

Update the arguments used to invoke the application

Note that this will also update the dictionary of input and output files

Parameters:override_args (dict) – dictionary of arguments to override the current values
fermipy.jobs.scatter_gather.clean_job(logfile, outfiles, dry_run=False)[source]

Removes log file and files created by failed jobs.

If dry_run is True, print name of files to be removed, but do not remove them.

fermipy.jobs.scatter_gather.remove_file(filepath, dry_run=False)[source]

Remove the file at filepath

Catches exception if the file does not exist.

If dry_run is True, print name of file to be removed, but do not remove it.

Module contents