fermipy.jobs subpackage

The fermipy.jobs sub-package is a light-weight, largely standalone, package to manage data analysis pipelines. It allows the user to build up increasingly complex analysis pipelines from single applications that are callable either from inside python or from the unix command line.

Module contents

ScatterGather class

class fermipy.jobs.scatter_gather.ScatterGather(link, **kwargs)[source]

Bases: fermipy.jobs.link.Link

Class to dispatch several jobs in parallel and collect and merge the results.

Sub-classes will need to generatare configuration for the jobs that they launch.

Parameters:
  • clientclass (type) – Type of Link object managed by this class.
  • job_time (int) – Estimated maximum time it takes to run a job This is used to manage batch farm scheduling and checking for completion.
appname = 'dummy-sg'
build_job_configs(args)[source]

Hook to build job configurations

Sub-class implementation should return:

job_configs : dict
Dictionary of dictionaries passed to parallel jobs
check_status(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, check_once=False, fail_pending=False, fail_running=False, no_wait=False, do_print=True, write_status=False)[source]

Loop to check on the status of all the jobs in job dict.

Parameters:
  • stream (file) – Stream that this function will print to, Must have ‘write’ function.
  • check_once (bool) – Check status once and exit loop.
  • fail_pending (bool) – If True, consider pending jobs as failed
  • fail_running (bool) – If True, consider running jobs as failed
  • no_wait (bool) – Do not sleep before checking jobs.
  • do_print (bool) – Print summary stats.
  • write_status (bool) – Write the status the to log file.
Returns:

status_vect – Vector that summarize the number of jobs in various states.

Return type:

JobStatusVector

clean_jobs(recursive=False)[source]

Clean up all the jobs associated with this object.

If recursive is True this also clean jobs dispatch by this object.

clear_jobs(recursive=True)[source]

Clear the self.jobs dictionary that contains information about jobs associated with this ScatterGather

If recursive is True this will include jobs from all internal Link

clientclass = None
classmethod create(**kwargs)[source]

Build and return a ScatterGather object

default_options = {}
default_options_base = {'action': ('run', 'Action to perform', <class 'str'>), 'check_status_once': (False, 'Check status only once before proceeding', <class 'bool'>), 'dry_run': (False, 'Print commands, but do not execute them', <class 'bool'>), 'job_check_sleep': (300, 'Sleep time between checking on job status (s)', <class 'int'>), 'print_update': (False, 'Print summary of job status', <class 'bool'>)}
default_prefix_logfile = 'scatter'
description = 'Run multiple analyses'
get_jobs(recursive=True)[source]

Return a dictionary with all the jobs

If recursive is True this will include jobs from all internal Link

job_time = 1500
classmethod main()[source]

Hook for command line interface to sub-classes

print_failed(stream=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]

Print list of the failed jobs

print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='', recurse_level=2)[source]

Print a summary of the activity done by this Link.

Parameters:
  • stream (file) – Stream to print to
  • indent (str) – Indentation at start of line
  • recurse_level (int) – Number of recursion levels to print
print_update(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, job_stats=None)[source]

Print an update about the current number of jobs running

resubmit(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, fail_running=False, resubmit_failed=False)[source]

Function to resubmit failed jobs and collect results

Parameters:
  • stream (file) – Stream that this function will print to, Must have ‘write’ function.
  • fail_running (bool) – If True, consider running jobs as failed
  • resubmit_failed (bool) – Resubmit failed jobs.
Returns:

status_vect – Vector that summarize the number of jobs in various states.

Return type:

JobStatusVector

run(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False, stage_files=True, resubmit_failed=True)[source]

Runs this Link.

This version is intended to be overwritten by sub-classes so as to provide a single function that behaves the same for all version of Link

Parameters:
  • stream (file) – Stream that this Link will print to, Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it.
  • stage_files (bool) – Copy files to and from scratch staging area.
  • resubmit_failed (bool) – Flag for sub-classes to resubmit failed jobs.
run_analysis(argv)[source]

Implemented by sub-classes to run a particular analysis

run_jobs(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, resubmit_failed=False)[source]

Function to dipatch jobs and collect results

Parameters:
  • stream (file) – Stream that this function will print to, Must have ‘write’ function.
  • resubmit_failed (bool) – Resubmit failed jobs.
Returns:

status_vect – Vector that summarize the number of jobs in various states.

Return type:

JobStatusVector

Return the Link object used the scatter phase of processing

update_args(override_args)[source]

Update the arguments used to invoke the application

Note that this will also update the dictionary of input and output files

Parameters:override_args (dict) – dictionary of arguments to override the current values
usage = 'dummy-sg [options]'

Chain class

class fermipy.jobs.chain.Chain(**kwargs)[source]

Bases: fermipy.jobs.link.Link

An object tying together a series of applications into a single application.

This class keep track of the arguments to pass to the applications as well as input and output files.

Note that this class is itself a Link. This allows you to write a python module that implements a chain and also has a __main__ function to allow it to be called from the shell.

“Check the status of all the jobs run from the Link objects in this Chain and return a status flag that summarizes that.

Parameters:
  • fail_running (bool) – If True, consider running jobs as failed
  • fail_pending (bool) – If True, consider pending jobs as failed
Returns:

status – Job status flag that summarizes the status of all the jobs,

Return type:

JobStatus

clear_jobs(recursive=True)[source]

Clear a dictionary with all the jobs

If recursive is True this will include jobs from all internal Link

get_jobs(recursive=True)[source]

Return a dictionary with all the jobs

If recursive is True this will include jobs from all internal Link

linknames

Return the name of the Link objects owned by this Chain

Return the OrderedDict of Link objects owned by this Chain

load_config(configfile)[source]

Read a config file for the top-level arguemnts

classmethod main()[source]

Hook to run this Chain from the command line

missing_input_files()[source]

Make and return a dictionary of the missing input files.

This returns a dictionary mapping filepath to list of Link that use the file as input.

missing_output_files()[source]

Make and return a dictionary of the missing output files.

This returns a dictionary mapping filepath to list of links that produce the file as output.

print_status(indent='', recurse=False)[source]

Print a summary of the job status for each Link in this Chain

print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='', recurse_level=2)[source]

Print a summary of the activity done by this Chain.

Parameters:
  • stream (file) – Stream to print to, must have ‘write’ method.
  • indent (str) – Indentation at start of line
  • recurse_level (int) – Number of recursion levels to print
run(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False, stage_files=True, resubmit_failed=False)[source]

Runs this Chain.

Parameters:
  • stream (file) – Stream that this Link will print to, Must have ‘write’ function
  • dry_run (bool) – Print command but do not run it.
  • stage_files (bool) – Copy files to and from scratch staging area.
  • resubmit_failed (bool) – Flag for sub-classes to resubmit failed jobs.
run_analysis(argv)[source]

Implemented by sub-classes to run a particular analysis

update_args(override_args)[source]

Update the argument used to invoke the application

Note that this will also update the dictionary of input and output files.

Parameters:override_args (dict) – dictionary passed to the links

High-level analysis classes

These are Link sub-classes that implement fermipy analyses, or perform tasks related to fermipy analyses, such as plotting or collecting results for a set of simulations.

class fermipy.jobs.target_analysis.AnalyzeROI(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class that wraps an analysis script.

This particular script does baseline fitting of an ROI.
Parameters:
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • make_plots (<class 'bool'>) – Make plots [False]
appname = 'fermipy-analyze-roi'
default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>)}
description = 'Run analysis of a single ROI'
linkname_default = 'analyze-roi'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-analyze-roi [options]'
class fermipy.jobs.target_analysis.AnalyzeSED(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class to wrap an analysis script.

This particular script fits an SED for a target source with respect to the baseline ROI model.
Parameters:
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • skydirs (<class 'str'>) – Yaml file with blank sky directions. [None]
  • profiles (<class 'list'>) – List of profiles to analyze [[]]
  • make_plots (<class 'bool'>) – Make plots [False]
  • non_null_src (<class 'bool'>) – Zero out test source [False]
appname = 'fermipy-analyze-sed'
default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'profiles': ([], 'List of profiles to analyze', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'skydirs': (None, 'Yaml file with blank sky directions.', <class 'str'>)}
description = 'Extract the SED for a single target'
linkname_default = 'analyze-sed'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-analyze-sed [options]'
class fermipy.jobs.target_collect.CollectSED(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class to collect SED results from a series of simulations.

Parameters:
  • sed_file (<class 'str'>) – Path to SED file. [None]
  • outfile (<class 'str'>) – Path to output file. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • summaryfile (<class 'str'>) – Path to file with results summaries. [None]
  • nsims (<class 'int'>) – Number of simulations to run. [20]
  • enumbins (<class 'int'>) – Number of energy bins [12]
  • seed (<class 'int'>) – Seed number for first simulation. [0]
  • dry_run (<class 'bool'>) – Print commands but do not run them. [False]
appname = 'fermipy-collect-sed'
collist = [{'name': 'e_min', 'unit': 'MeV'}, {'name': 'e_ref', 'unit': 'MeV'}, {'name': 'e_max', 'unit': 'MeV'}, {'name': 'ref_dnde_e_min', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_dnde_e_max', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_dnde', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_flux', 'unit': 'cm-2 ph s-1'}, {'name': 'ref_eflux', 'unit': 'cm-2 MeV s-1'}, {'name': 'ref_npred'}, {'name': 'dnde', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_err', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_errp', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_errn', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_ul', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'e2dnde', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_err', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_errp', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_errn', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_ul', 'unit': 'cm-2 MeV s-1'}, {'name': 'norm'}, {'name': 'norm_err'}, {'name': 'norm_errp'}, {'name': 'norm_errn'}, {'name': 'norm_ul'}, {'name': 'ts'}]
default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'dry_run': (False, 'Print commands but do not run them.', <class 'bool'>), 'enumbins': (12, 'Number of energy bins', <class 'int'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'outfile': (None, 'Path to output file.', <class 'str'>), 'sed_file': (None, 'Path to SED file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'summaryfile': (None, 'Path to file with results summaries.', <class 'str'>)}
description = 'Collect SED results from simulations'
linkname_default = 'collect-sed'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-collect-sed [options]'
class fermipy.jobs.target_sim.CopyBaseROI(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class to copy a baseline ROI to a simulation area
This is useful for parallelizing analysis using the fermipy.jobs module.
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • target (<class 'str'>) – Name of analysis target. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • extracopy (<class 'list'>) – Extra files to copy [[]]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
appname = 'fermipy-copy-base-roi'
classmethod copy_analysis_files(orig_dir, dest_dir, copyfiles)[source]

Copy a list of files from orig_dir to dest_dir

classmethod copy_target_dir(orig_dir, dest_dir, roi_baseline, extracopy)[source]

Create and populate directoris for target analysis

copyfiles = ['srcmap_*.fits', 'ccube.fits', 'ccube_*.fits']
default_options = {'extracopy': ([], 'Extra files to copy', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'target': (None, 'Name of analysis target.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Copy a baseline ROI to a simulation area'
linkname_default = 'copy-base-roi'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-copy-base-roi [options]'
class fermipy.jobs.target_sim.RandomDirGen(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class to generate random sky directions inside an ROI
This is useful for parallelizing analysis using the fermipy.jobs module.
Parameters:
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • rand_config (<class 'str'>) – Path to config file for genaration random sky dirs [None]
  • outfile (<class 'str'>) – Path to output file. [None]
appname = 'fermipy-random-dir-gen'
default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'outfile': (None, 'Path to output file.', <class 'str'>), 'rand_config': (None, 'Path to config file for genaration random sky dirs', <class 'str'>)}
description = 'Generate random sky directions in an ROI'
linkname_default = 'random-dir-gen'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-random-dir-gen [options]'
class fermipy.jobs.target_sim.SimulateROI(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class wrap an analysis script.
This is useful for parallelizing analysis using the fermipy.jobs module.
Parameters:
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • profiles (<class 'list'>) – List of profiles to analyze [[]]
  • non_null_src (<class 'bool'>) – Zero out test source [False]
  • do_find_src (<class 'bool'>) – Add source finding step to simulated realizations [False]
  • sim_profile (<class 'str'>) – Name of the profile to use for simulation. [default]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
  • nsims (<class 'int'>) – Number of simulations to run. [20]
  • seed (<class 'int'>) – Seed number for first simulation. [0]
appname = 'fermipy-simulate-roi'
default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'do_find_src': (False, 'Add source finding step to simulated realizations', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'profiles': ([], 'List of profiles to analyze', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'sim_profile': ('default', 'Name of the profile to use for simulation.', <class 'str'>)}
description = 'Run simulated analysis of a single ROI'
linkname_default = 'simulate-roi'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-simulate-roi [options]'
class fermipy.jobs.target_plotting.PlotCastro(**kwargs)[source]

Bases: fermipy.jobs.link.Link

Small class to plot an SED as a ‘Castro’ plot.

Parameters:
  • infile (<class 'str'>) – Path to input file. [None]
  • outfile (<class 'str'>) – Path to output file. [None]
appname = 'fermipy-plot-castro'
default_options = {'infile': (None, 'Path to input file.', <class 'str'>), 'outfile': (None, 'Path to output file.', <class 'str'>)}
description = 'Plot likelihood v. flux normalization and energy'
linkname_default = 'plot-castro'
run_analysis(argv)[source]

Run this analysis

usage = 'fermipy-plot-castro [options]'

High-level analysis job dispatch

These are ScatterGather sub-classes that invoke the Link sub-classes listed above.

class fermipy.jobs.target_analysis.AnalyzeROI_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for the AnalyzeROI class.

This loops over all the targets defined in the target list.
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • make_plots (<class 'bool'>) – Make plots [False]
appname = 'fermipy-analyze-roi-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of AnalyzeROI

default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Run analyses on a series of ROIs'
job_time = 1500
usage = 'fermipy-analyze-roi-sg [options]'
class fermipy.jobs.target_analysis.AnalyzeSED_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for this script

This loops over all the targets defined in the target list, and over all the profiles defined for each target.
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • skydirs (<class 'str'>) – Yaml file with blank sky directions. [None]
  • make_plots (<class 'bool'>) – Make plots [False]
  • non_null_src (<class 'bool'>) – Zero out test source [False]
appname = 'fermipy-analyze-sed-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of AnalyzeSED

default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'skydirs': (None, 'Yaml file with blank sky directions.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Run analyses on a series of ROIs'
job_time = 1500
usage = 'fermipy-analyze-sed-sg [options]'
class fermipy.jobs.target_collect.CollectSED_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for CollectSED

This loops over all the targets defined in the target list
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
  • nsims (<class 'int'>) – Number of simulations to run. [20]
  • seed (<class 'int'>) – Seed number for first simulation. [0]
  • write_full (<class 'bool'>) – Write file with full collected results [False]
  • write_summary (<class 'bool'>) – Write file with summary of collected results [False]
appname = 'fermipy-collect-sed-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of CollectSED

default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>), 'write_full': (False, 'Write file with full collected results', <class 'bool'>), 'write_summary': (False, 'Write file with summary of collected results', <class 'bool'>)}
description = 'Collect SED data from a set of simulations for a series of ROIs'
job_time = 120
usage = 'fermipy-collect-sed-sg [options]'
class fermipy.jobs.target_sim.CopyBaseROI_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for this script
This adds the following arguments:
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
  • extracopy (<class 'list'>) – Extra files to copy [[]]
appname = 'fermipy-copy-base-roi-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of CopyBaseROI

default_options = {'extracopy': ([], 'Extra files to copy', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Run analyses on a series of ROIs'
job_time = 60
usage = 'fermipy-copy-base-roi-sg [options]'
class fermipy.jobs.target_sim.RandomDirGen_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for this script
This adds the following arguments:
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • rand_config (<class 'str'>) – Path to config file for genaration random sky dirs [None]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
appname = 'fermipy-random-dir-gen-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of RandomDirGen

default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'rand_config': (None, 'Path to config file for genaration random sky dirs', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Run analyses on a series of ROIs'
job_time = 60
usage = 'fermipy-random-dir-gen-sg [options]'
class fermipy.jobs.target_sim.SimulateROI_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for this script
This adds the following arguments:
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
  • config (<class 'str'>) – Path to fermipy config file. [None]
  • roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
  • non_null_src (<class 'bool'>) – Zero out test source [False]
  • do_find_src (<class 'bool'>) – Add source finding step to simulated realizations [False]
  • sim (<class 'str'>) – Name of the simulation scenario. [None]
  • sim_profile (<class 'str'>) – Name of the profile to use for simulation. [default]
  • nsims (<class 'int'>) – Number of simulations to run. [20]
  • seed (<class 'int'>) – Seed number for first simulation. [0]
  • nsims_job (<class 'int'>) – Number of simulations to run per job. [0]
appname = 'fermipy-simulate-roi-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of SimulateROI

default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'do_find_src': (False, 'Add source finding step to simulated realizations', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'nsims_job': (0, 'Number of simulations to run per job.', <class 'int'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'sim_profile': ('default', 'Name of the profile to use for simulation.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Run analyses on a series of ROIs'
job_time = 1500
usage = 'fermipy-simulate-roi-sg [options]'
class fermipy.jobs.target_plotting.PlotCastro_SG(link, **kwargs)[source]

Bases: fermipy.jobs.scatter_gather.ScatterGather

Small class to generate configurations for the PlotCastro class.

This loops over all the targets defined in the target list.
Parameters:
  • ttype (<class 'str'>) – Type of target being analyzed. [None]
  • targetlist (<class 'str'>) – Path to the target list. [None]
appname = 'fermipy-plot-castro-sg'
build_job_configs(args)[source]

Hook to build job configurations

clientclass

alias of PlotCastro

default_options = {'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
description = 'Make castro plots for set of targets'
job_time = 60
usage = 'fermipy-plot-castro-sg [options]'

Batch and System Interfaces

Abstract interface for interactions with system for launching jobs.

class fermipy.jobs.sys_interface.SysInterface(**kwargs)[source]

Bases: object

Base class to handle job dispatching interface

classmethod check_job(job_details)[source]

Check the status of a specfic job

clean_jobs(link, job_dict=None, clean_all=False)[source]

Clean up all the jobs associated with this link.

Returns a JobStatus enum

dispatch_job(link, key, job_archive, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Function to dispatch a single job

Parameters:
  • link (Link) – Link object that sendes the job
  • key (str) – Key used to identify this particular job
  • job_archive (JobArchive) – Archive used to keep track of jobs
  • JobDetails object (Returns) –
dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Hook to dispatch a single job

string_exited = 'Exited with exit code'
string_successful = 'Successfully completed'

C’tor

submit_jobs(link, job_dict=None, job_archive=None, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Run the Link with all of the items job_dict as input.

If job_dict is None, the job_dict will be take from link.jobs

Returns a JobStatus enum

fermipy.jobs.sys_interface.check_log(logfile, exited='Exited with exit code', successful='Successfully completed')[source]

Check a log file to determine status of LSF job

Often logfile doesn’t exist because the job hasn’t begun to run. It is unclear what you want to do in that case…

Parameters:
  • logfile (str) – String with path to logfile
  • exited (str) – Value to check for in existing logfile for exit with failure
  • successful (str) – Value to check for in existing logfile for success
  • str, one of 'Pending', 'Running', 'Done', 'Failed' (Returns) –
fermipy.jobs.sys_interface.clean_job(logfile, outfiles, dry_run=False)[source]

Removes log file and files created by failed jobs.

If dry_run is True, print name of files to be removed, but do not remove them.

fermipy.jobs.sys_interface.remove_file(filepath, dry_run=False)[source]

Remove the file at filepath

Catches exception if the file does not exist.

If dry_run is True, print name of file to be removed, but do not remove it.

Implementation of ScatterGather class for dealing with LSF batch jobs

class fermipy.jobs.native_impl.NativeInterface(**kwargs)[source]

Bases: fermipy.jobs.sys_interface.SysInterface

Implmentation of ScatterGather that uses the native system

dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Send a single job to be executed

Parameters:
  • link (fermipy.jobs.chain.Link) – The link used to invoke the command we are running
  • key (str) – A string that identifies this particular instance of the job
  • job_config (dict) – A dictionrary with the arguments for the job. Used with the self._command_template job template
  • logfile (str) – The logfile for this job, may be used to check for success/ failure
string_exited = 'Exited with exit code'
string_successful = 'Successfully completed'
submit_jobs(link, job_dict=None, job_archive=None, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Submit all the jobs in job_dict

fermipy.jobs.native_impl.get_native_default_args()[source]

Get the correct set of batch jobs arguments.

Implementation of ScatterGather interface class for dealing with LSF batch jobs at SLAC

class fermipy.jobs.slac_impl.SlacInterface(**kwargs)[source]

Bases: fermipy.jobs.sys_interface.SysInterface

Implmentation of ScatterGather that uses LSF

dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Send a single job to the LSF batch

Parameters:
  • link (fermipy.jobs.chain.Link) – The link used to invoke the command we are running
  • key (str) – A string that identifies this particular instance of the job
  • job_config (dict) – A dictionrary with the arguments for the job. Used with the self._command_template job template
  • logfile (str) – The logfile for this job, may be used to check for success/ failure
string_exited = 'Exited with exit code'
string_successful = 'Successfully completed'
submit_jobs(link, job_dict=None, job_archive=None, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]

Submit all the jobs in job_dict

fermipy.jobs.slac_impl.build_bsub_command(command_template, lsf_args)[source]

Build and return a lsf batch command template

The structure will be ‘bsub -s <key> <value> <command_template>’
where <key> and <value> refer to items in lsf_args
fermipy.jobs.slac_impl.get_lsf_status()[source]

Count and print the number of jobs in various LSF states

fermipy.jobs.slac_impl.get_slac_default_args(job_time=1500)[source]

Create a batch job interface object.

Parameters:job_time (int) – Expected max length of the job, in seconds. This is used to select the batch queue and set the job_check_sleep parameter that sets how often we check for job completion.
fermipy.jobs.slac_impl.make_gpfs_path(path)[source]

Make a gpfs version of a file path. This just puts /gpfs at the beginning instead of /nfs

fermipy.jobs.slac_impl.make_nfs_path(path)[source]

Make a nfs version of a file path. This just puts /nfs at the beginning instead of /gpfs

Factory module to return the default interace to the batch farm

fermipy.jobs.batch.get_batch_job_args(job_time=1500)[source]

Get the correct set of batch jobs arguments.

Parameters:job_time (int) – Expected max length of the job, in seconds. This is used to select the batch queue and set the job_check_sleep parameter that sets how often we check for job completion.
Returns:job_args – Dictionary of arguments used to submit a batch job
Return type:dict
fermipy.jobs.batch.get_batch_job_interface(job_time=1500)[source]

Create a batch job interface object.

Parameters:job_time (int) – Expected max length of the job, in seconds. This is used to select the batch queue and set the job_check_sleep parameter that sets how often we check for job completion.
Returns:job_interfact – Object that manages interactions with batch farm
Return type:SysInterface

File Archive module

Classes and utilites to keep track of files associated to an analysis.

The main class is FileArchive, which keep track of all the files associated to an analysis.

The FileHandle helper class encapsulates information on a particular file.

class fermipy.jobs.file_archive.FileArchive(**kwargs)[source]

Bases: object

Class that keeps track of the status of files used in an analysis

Parameters:
base_path

Return the base file path for all files in this FileArchive

classmethod build_archive(**kwargs)[source]

Return the singleton FileArchive instance, building it if needed

cache

Return the transiet representation of this FileArchive

classmethod get_archive()[source]

Return the singleton FileArchive instance

get_file_ids(file_list, creator=None, status=0, file_dict=None)[source]

Get or create a list of file ids based on file names

Parameters:
  • file_list (list) – The paths to the file
  • creatror (int) – A unique key for the job that created these files
  • status (FileStatus) – Enumeration giving current status of files
  • file_dict (FileDict) – Mask giving flags set on this file
  • list of integers (Returns) –
get_file_paths(id_list)[source]

Get a list of file paths based of a set of ids

Parameters:
  • id_list (list) – List of integer file keys
  • list of file paths (Returns) –
get_handle(filepath)[source]

Get the FileHandle object associated to a particular file

register_file(filepath, creator, status=0, flags=0)[source]

Register a file in the archive.

If the file already exists, this raises a KeyError

Parameters:
  • filepath (str) – The path to the file
  • creatror (int) – A unique key for the job that created this file
  • status (FileStatus) – Enumeration giving current status of file
  • flags (FileFlags) – Enumeration giving flags set on this file
  • FileHandle (Returns) –
table

Return the persistent representation of this FileArchive

table_file

Return the path to the file used to persist this FileArchive

update_file(filepath, creator, status)[source]

Update a file in the archive

If the file does not exists, this raises a KeyError

Parameters:
  • filepath (str) – The path to the file
  • creatror (int) – A unique key for the job that created this file
  • status (FileStatus) – Enumeration giving current status of file
  • FileHandle (Returns) –
update_file_status()[source]

Update the status of all the files in the archive

write_table_file(table_file=None)[source]

Write the table to self._table_file

class fermipy.jobs.file_archive.FileDict(**kwargs)[source]

Bases: object

Small class to keep track of files used & createed by a link.

Parameters:
  • file_args (dict) – Dictionary mapping argument to FileFlags enum
  • file_dict (dict) – Dictionary mapping file path to FileFlags enum
chain_input_files

Return a list of the input files needed by this chain.

For Link sub-classes this will return only those files that were not created by any internal Link

chain_output_files

Return a list of the all the output files produced by this link.

For Link sub-classes this will return only those files that were not marked as internal files or marked for removal.

gzip_files

Return a list of the files compressed by this link.

This returns all files that were explicitly marked for compression.

input_files

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

input_files_to_stage

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

internal_files

Return a list of the intermediate files produced by this link.

This returns all files that were explicitly marked as internal files.

items()[source]

Return iterator over self.file_dict

latch_file_info(args)[source]

Extract the file paths from a set of arguments

output_files

Return a list of the output files produced by this link.

For Link sub-classes this will return the union of all the output files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

output_files_to_stage

Return a list of the input files needed by this link.

For Link sub-classes this will return the union of all the input files of each internal Link.

That is to say this will include files produced by one Link in a Chain and used as input to another Link in the Chain

print_chain_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='')[source]

Print a summary of the files in this file dict.

This version uses chain_input_files and chain_output_files to count the input and output files.

print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='')[source]

Print a summary of the files in this file dict.

This version explictly counts the union of all input and output files.

temp_files

Return a list of the temporary files produced by this link.

This returns all files that were explicitly marked for removal.

update(file_dict)[source]

Update self with values from a dictionary mapping file path [str] to FileFlags enum

class fermipy.jobs.file_archive.FileFlags[source]

Bases: object

Bit masks to indicate file types

gz_mask = 8
in_ch_mask = 23
in_stage_mask = 33
input_mask = 1
internal_mask = 16
no_flags = 0
out_ch_mask = 22
out_stage_mask = 34
output_mask = 2
rm_mask = 4
rmint_mask = 20
stageable = 32
class fermipy.jobs.file_archive.FileHandle(**kwargs)[source]

Bases: object

Class to keep track of infomration about a file file.

Parameters:
  • key (int) – Unique id for this particular file
  • creator (int) – Unique id for the job that created this file
  • timestamp (int) – File creation time cast as an int
  • status (FileStatus) – Enum giving current status of file
  • flags (FileFlags) – Mask giving flags set on this file
  • path (str) – Path to file
append_to_table(table)[source]

Add this instance as a row on a astropy.table.Table

check_status(basepath=None)[source]

Check on the status of this particular file

classmethod create_from_row(table_row)[source]

Build and return a FileHandle from an astropy.table.row.Row

classmethod make_dict(table)[source]

Build and return a dict of FileHandle from an astropy.table.Table

The dictionary is keyed by FileHandle.key, which is a unique integer for each file

static make_table(file_dict)[source]

Build and return an astropy.table.Table to store FileHandle

update_table_row(table, row_idx)[source]

Update the values in an astropy.table.Table for this instances

class fermipy.jobs.file_archive.FileStageManager(scratchdir, workdir)[source]

Bases: object

Small class to deal with staging files to and from a scratch area

construct_scratch_path(dirname, basename)[source]

Construct and return a path in the scratch area.

This will be <self.scratchdir>/<dirname>/<basename>

static copy_from_scratch(file_mapping, dry_run=True)[source]

Copy output files from scratch area

static copy_to_scratch(file_mapping, dry_run=True)[source]

Copy input files to scratch area

get_scratch_path(local_file)[source]

Construct and return a path in the scratch area from a local file.

static make_scratch_dirs(file_mapping, dry_run=True)[source]

Make any directories need in the scratch area

map_files(local_files)[source]

Build a dictionary mapping local paths to scratch paths.

Parameters:
  • local_files (list) – List of filenames to be mapped to scratch area
  • dict (Returns) – Mapping local_file : fullpath of scratch file
split_local_path(local_file)[source]

Split the local path into a directory name and a file name

If local_file is in self.workdir or a subdirectory of it, the directory will consist of the relative path from workdir.

If local_file is not in self.workdir, directory will be empty.

Returns (dirname, basename)

class fermipy.jobs.file_archive.FileStatus[source]

Bases: object

Enumeration of file status types

exists = 2
expected = 1
missing = 3
no_file = 0
superseded = 4
temp_removed = 5
fermipy.jobs.file_archive.get_timestamp()[source]

Get the current time as an integer

fermipy.jobs.file_archive.get_unique_match(table, colname, value)[source]

Get the row matching value for a particular column. If exactly one row matchs, return index of that row, Otherwise raise KeyError.

fermipy.jobs.file_archive.main_browse()[source]

Entry point for command line use for browsing a FileArchive

Job Archive module

Classes and utilites to keep track the various jobs that are running in an analysis pipeline.

The main class is JobArchive, which keep track of all the jobs associated to an analysis.

The JobDetails helper class encapsulates information on a instance of running a job.

class fermipy.jobs.job_archive.JobArchive(**kwargs)[source]

Bases: object

Class that keeps of all the jobs associated to an analysis.

Parameters:
  • table_file (str) – Path to the file used to persist this JobArchive
  • table (astropy.table.Table) – Persistent representation of this JobArchive
  • table_ids (astropy.table.Table) – Ancillary table with information about file ids
  • file_archive (FileArchive) – Archive with infomation about all this files used and produced by this analysis
classmethod build_archive(**kwargs)[source]

Return the singleton JobArchive instance, building it if needed

classmethod build_temp_job_archive()[source]

Build and return a JobArchive using defualt locations of persistent files.

cache

Return the transiet representation of this JobArchive

file_archive

Return the FileArchive with infomation about all the files used and produced by this analysis

classmethod get_archive()[source]

Return the singleton JobArchive instance

get_details(jobname, jobkey)[source]

Get the JobDetails associated to a particular job instance

make_job_details(row_idx)[source]

Create a JobDetails from an astropy.table.row.Row

register_job(job_details)[source]

Register a job in this JobArchive

Register a job in the JobArchive from a Link object

register_jobs(job_dict)[source]

Register a bunch of jobs in this archive

remove_jobs(mask)[source]

Mark all jobs that match a mask as ‘removed’

table

Return the persistent representation of this JobArchive

table_file

Return the path to the file used to persist this JobArchive

table_ids

Return the rpersistent epresentation of the ancillary info of this JobArchive

update_job(job_details)[source]

Update a job in the JobArchive

update_job_status(checker_func)[source]

Update the status of all the jobs in the archive

write_table_file(job_table_file=None, file_table_file=None)[source]

Write the table to self._table_file

class fermipy.jobs.job_archive.JobDetails(**kwargs)[source]

Bases: object

A simple structure to keep track of the details of each of the sub-proccess jobs.

Parameters:
  • dbkey (int) – A unique key to identify this job
  • jobname (str) – A name used to idenfity this job
  • jobkey (str) – A string to identify this instance of the job
  • appname (str) – The executable inovked to run the job
  • logfile (str) – The logfile for this job, may be used to check for success/ failure
  • job_config (dict) – A dictionrary with the arguments for the job
  • parent_id (int) – Unique key identifying the parent job
  • infile_ids (list of int) – Keys to identify input files to this job
  • outfile_ids (list of int) – Keys to identify output files from this job
  • rmfile_ids (list of int) – Keys to identify temporary files removed by this job
  • intfile_ids (list of int) – Keys to identify internal files
  • status (int) – Current job status, one of the enums above
append_to_tables(table, table_ids)[source]

Add this instance as a row on a astropy.table.Table

check_status_logfile(checker_func)[source]

Check on the status of this particular job using the logfile

classmethod create_from_row(table_row)[source]

Create a JobDetails from an astropy.table.row.Row

fullkey

Return the fullkey for this job fullkey = <jobkey>@<jobname>

get_file_ids(file_archive, creator=None, status=0)[source]

Fill the file id arrays from the file lists

Parameters:
  • file_archive (FileArchive) – Used to look up file ids
  • creator (int) – A unique key for the job that created these file
  • status (FileStatus) – Enumeration giving current status thse files
get_file_paths(file_archive, file_id_array)[source]

Get the full paths of the files used by this object from the the id arrays

Parameters:
  • file_archive (FileArchive) – Used to look up file ids
  • file_id_array (numpy.array) – Array that remaps the file indexes
classmethod make_dict(table)[source]

Build a dictionary map int to JobDetails from an astropy.table.Table

static make_fullkey(jobname, jobkey='__top__')[source]

Combine jobname and jobkey to make a unique key fullkey = <jobkey>@<jobname>

static make_tables(job_dict)[source]

Build and return an astropy.table.Table' to store `JobDetails

static split_fullkey(fullkey)[source]

Split fullkey to make extract jobname, jobkey fullkey = <jobkey>@<jobname>

topkey = '__top__'
update_table_row(table, row_idx)[source]

Add this instance as a row on a astropy.table.Table

class fermipy.jobs.job_archive.JobStatus[source]

Bases: object

Enumeration of job status types

done = 5
failed = 6
no_job = -1
not_ready = 1
partial_failed = 7
pending = 3
ready = 2
removed = 8
running = 4
unknown = 0
class fermipy.jobs.job_archive.JobStatusVector[source]

Bases: object

Vector that counts the status of jobs and returns an overall status flag based on those

get_status()[source]

Return an overall status based on the number of jobs in various states.

n_done

Return the number of successfully completed jobs

n_failed

Return the number of failed jobs

n_pending

Return the number jobs submitted to batch, but not yet running

n_running

Return the number of running jobs

n_total

Return the total number of jobs

n_waiting

Return the number of jobs in various waiting states

reset()[source]

Reset the counters

fermipy.jobs.job_archive.main_browse()[source]

Entry point for command line use for browsing a JobArchive