fermipy.jobs subpackage
The fermipy.jobs sub-package is a light-weight, largely standalone, package to manage data analysis pipelines. It allows the user to build up increasingly complex analysis pipelines from single applications that are callable either from inside python or from the unix command line.
Subpackage contents
- Link objects
- Link sub-classes
- Using Links and sub-classes in python
- Using fermipy.jobs to analyze multiple ROIS
- Overview
- Master Configuration File
- Additional Configuration files
- Fermipy Analysis Configuration Yaml
- Profile Alias Configuration Yaml
- Simulation Scenario Configuration Yaml
- Random Direction Control Sample Configuration Yaml
- Preparing the analysis areas
- Baseline target analysis
- Target SED analysis
- Simulated realizations of ROI analysis
- Random Direction Control Studies
- Plotting Results
Module contents
Link class and trivial sub-classes
- class fermipy.jobs.link.Link(**kwargs)[source]
Bases:
object
A wrapper for a command line application.
This class keeps track for the arguments to pass to the application as well as input and output files.
This can be used either with other
Link
objects to build aChain
, or as standalone wrapper to pass configuration to the application.Derived classes will need to override the appname and linkname-default class parameters.
- Parameters:
default_options (dict) – Dictionry with options, defaults, helpstring and types for the parameters associated with the
Link
default_file_args (dict) – Dictionary specifying if particular parameters are associated with input or output files.
linkname (str) – Name of this
Link
, used as a key to find it in aChain
.link_prefix (str) – Optional prefix for this
Link
, used to distinguish between similarLink
objects on differentChain
objects.args (dict) – Up-to-date dictionary with the arguments that will be passed to the application
_options (dict) – Dictionary with the options that we are allowed to set and default values
files (
FileDict
) – Object that keeps track of input and output filesjobs (
OrderedDict
) – Dictionary mapping keys toJobDetails
. This contains information about all the batch jobs associated to thisLink
- appname = 'dummy'
- property arg_names
Return the list of arg names
- check_input_files(return_found=True, return_missing=True)[source]
Check if input files exist.
- Parameters:
- Returns:
- check_job_status(key='__top__', fail_running=False, fail_pending=False, force_check=False)[source]
Check the status of a particular job
By default this checks the status of the top-level job, but can by made to drill into the sub-jobs.
- Parameters:
- Returns:
status – Job status flag
- Return type:
JobStatus
- check_jobs_status(fail_running=False, fail_pending=False)[source]
Check the status of all the jobs run from this link and return a status flag that summarizes that.
- check_output_files(return_found=True, return_missing=True)[source]
Check if output files exist.
- Parameters:
- Returns:
- clean_jobs(recursive=False)[source]
Clean out all of the jobs associated to this link.
For sub-classes, if recursive is True this also clean jobs from any internal
Link
- clear_jobs(recursive=True)[source]
Clear the self.jobs dictionary that contains information about jobs associated with this
Link
.For sub-classes, if recursive is True this also clean jobs from any internal
Link
- command_template()[source]
Build and return a string that can be used as a template invoking this chain from the command line.
The actual command can be obtainted by using
self.command_template().format(**self.args)
- default_file_args = {}
- default_options = {}
- description = 'Link to run dummy'
- formatted_command()[source]
Build and return the formatted command for this
Link
.This is exactly the command as called from the Unix command line.
- property full_linkname
Return the linkname with the prefix attached This is useful to distinguish between links on different
Chain
objects.
- get_failed_jobs(fail_running=False, fail_pending=False)[source]
Return a dictionary with the subset of jobs that are marked as failed
- get_jobs(recursive=True)[source]
Return a dictionary with all the jobs
For sub-classes, if recursive is True this will include jobs from any internal
Link
- linkname_default = 'dummy'
- missing_input_files()[source]
Make and return a dictionary of the missing input files.
This returns a dictionary mapping filepath to list of
Link
that use the file as input.
- missing_output_files()[source]
Make and return a dictionary of the missing output files.
This returns a dictionary mapping filepath to list of links that produce the file as output.
- print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='', recurse_level=2)[source]
Print a summary of the activity done by this
Link
.
- run(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False, stage_files=True, resubmit_failed=False)[source]
Runs this
Link
.This version is intended to be overwritten by sub-classes so as to provide a single function that behaves the same for all version of
Link
- run_command(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False)[source]
Runs the command for this link. This method can be overridden by sub-classes to invoke a different command
- run_with_log(dry_run=False, stage_files=True, resubmit_failed=False)[source]
Runs this link with output sent to a pre-defined logfile
- topkey = '__top__'
- update_args(override_args)[source]
Update the argument used to invoke the application
Note that this will also update the dictionary of input and output files.
- Parameters:
override_args (dict) – Dictionary of arguments to override the current values
- usage = 'dummy [options]'
Utilities to chain together a series of ScienceTools apps
- class fermipy.jobs.gtlink.Gtlink(**kwargs)[source]
Bases:
Link
A wrapper for a single ScienceTools application
This class keeps track for the arguments to pass to the application as well as input and output files.
This can be used either with other
Link
to build aChain
, or as as standalone wrapper to pass conifguration to the application.See help for
chain.Link
for additional details- appname = 'dummy'
- command_template()[source]
Build and return a string that can be used as a template invoking this chain from the command line.
The actual command can be obtainted by using
self.command_template().format(**self.args)
- description = 'Link to run dummy'
- linkname_default = 'dummy'
- run_command(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False)[source]
Runs the command for this link. This method can be overridden by sub-classes to invoke a different command
- Parameters:
stream (
file
) – Must have ‘write’ functiondry_run (bool) – Print command but do not run it
- update_args(override_args)[source]
Update the argument used to invoke the application
See help for
chain.Link
for detailsThis calls the base class function then fills the parameters of the GtApp object
- usage = 'dummy [options]'
- fermipy.jobs.gtlink.build_gtapp(appname, dry_run, **kwargs)[source]
Build an object that can run ScienceTools application
- fermipy.jobs.gtlink.extract_parameters(pil, keys=None)[source]
Extract and return parameter names and values from a pil object
- fermipy.jobs.gtlink.run_gtapp(gtapp, stream, dry_run, **kwargs)[source]
Runs one on the ScienceTools apps
Taken from fermipy.gtanalysis.run_gtapp by Matt Wood
- Parameters:
gtapp (
GtApp.GtApp
object) – The application (e.g., gtbin)stream (stream object) – Must have ‘write’ function
dry_run (bool) – Print command but do not run it
kwargs (arguments used to invoke the application)
- fermipy.jobs.gtlink.update_gtapp(gtapp, **kwargs)[source]
Update the parameters of the object that can run ScienceTools applications
- Parameters:
gtapp (
GtApp.GtApp
) – Object that will run the application in questionkwargs (arguments used to invoke the application)
- class fermipy.jobs.app_link.AppLink(**kwargs)[source]
Bases:
Link
A wrapper for a single fermipy application
This class keeps track for the arguments to pass to the application as well as input and output files.
This can be used either with other
Link
to build aChain
, or as as standalone wrapper to pass conifguration to the application.See help for
Link
for additional details- appname = 'dummy'
- description = 'Link to run dummy'
- linkname_default = 'dummy'
- usage = 'dummy [options]'
ScatterGather class
- class fermipy.jobs.scatter_gather.ScatterGather(link, **kwargs)[source]
Bases:
Link
Class to dispatch several jobs in parallel and collect and merge the results.
Sub-classes will need to generatare configuration for the jobs that they launch.
- Parameters:
- appname = 'dummy-sg'
- build_job_configs(args)[source]
Hook to build job configurations
Sub-class implementation should return:
- job_configsdict
Dictionary of dictionaries passed to parallel jobs
- check_status(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, check_once=False, fail_pending=False, fail_running=False, no_wait=False, do_print=True, write_status=False)[source]
Loop to check on the status of all the jobs in job dict.
- Parameters:
stream (
file
) – Stream that this function will print to, Must have ‘write’ function.check_once (bool) – Check status once and exit loop.
fail_pending (
bool
) – If True, consider pending jobs as failedfail_running (
bool
) – If True, consider running jobs as failedno_wait (bool) – Do not sleep before checking jobs.
do_print (bool) – Print summary stats.
write_status (bool) – Write the status the to log file.
- Returns:
status_vect – Vector that summarize the number of jobs in various states.
- Return type:
JobStatusVector
- clean_jobs(recursive=False)[source]
Clean up all the jobs associated with this object.
If recursive is True this also clean jobs dispatch by this object.
- clear_jobs(recursive=True)[source]
Clear the self.jobs dictionary that contains information about jobs associated with this
ScatterGather
If recursive is True this will include jobs from all internal
Link
- clientclass = None
- classmethod create(**kwargs)[source]
Build and return a
ScatterGather
object
- default_options = {}
- default_options_base = {'action': ('run', 'Action to perform', <class 'str'>), 'check_status_once': (False, 'Check status only once before proceeding', <class 'bool'>), 'dry_run': (False, 'Print commands, but do not execute them', <class 'bool'>), 'job_check_sleep': (300, 'Sleep time between checking on job status (s)', <class 'int'>), 'print_update': (False, 'Print summary of job status', <class 'bool'>)}
- default_prefix_logfile = 'scatter'
- description = 'Run multiple analyses'
- get_jobs(recursive=True)[source]
Return a dictionary with all the jobs
If recursive is True this will include jobs from all internal
Link
- job_time = 1500
- print_failed(stream=<_io.TextIOWrapper name='<stderr>' mode='w' encoding='utf-8'>)[source]
Print list of the failed jobs
- print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='', recurse_level=2)[source]
Print a summary of the activity done by this
Link
.
- print_update(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, job_stats=None)[source]
Print an update about the current number of jobs running
- resubmit(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, fail_running=False, resubmit_failed=False)[source]
Function to resubmit failed jobs and collect results
- Parameters:
- Returns:
status_vect – Vector that summarize the number of jobs in various states.
- Return type:
JobStatusVector
- run(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, dry_run=False, stage_files=True, resubmit_failed=True)[source]
Runs this
Link
.This version is intended to be overwritten by sub-classes so as to provide a single function that behaves the same for all version of
Link
- run_jobs(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, resubmit_failed=False)[source]
Function to dipatch jobs and collect results
- Parameters:
stream (
file
) – Stream that this function will print to, Must have ‘write’ function.resubmit_failed (bool) – Resubmit failed jobs.
- Returns:
status_vect – Vector that summarize the number of jobs in various states.
- Return type:
JobStatusVector
- property scatter_link
Return the
Link
object used the scatter phase of processing
- update_args(override_args)[source]
Update the arguments used to invoke the application
Note that this will also update the dictionary of input and output files
- Parameters:
override_args (dict) – dictionary of arguments to override the current values
- usage = 'dummy-sg [options]'
Chain class
- class fermipy.jobs.chain.Chain(**kwargs)[source]
Bases:
Link
An object tying together a series of applications into a single application.
This class keep track of the arguments to pass to the applications as well as input and output files.
Note that this class is itself a
Link
. This allows you to write a python module that implements a chain and also has a __main__ function to allow it to be called from the shell.- check_links_status(fail_running=False, fail_pending=False)[source]
“Check the status of all the jobs run from the
Link
objects in thisChain
and return a status flag that summarizes that.
- clear_jobs(recursive=True)[source]
Clear a dictionary with all the jobs
If recursive is True this will include jobs from all internal
Link
- get_jobs(recursive=True)[source]
Return a dictionary with all the jobs
If recursive is True this will include jobs from all internal
Link
- missing_input_files()[source]
Make and return a dictionary of the missing input files.
This returns a dictionary mapping filepath to list of
Link
that use the file as input.
- missing_output_files()[source]
Make and return a dictionary of the missing output files.
This returns a dictionary mapping filepath to list of links that produce the file as output.
- print_status(indent='', recurse=False)[source]
Print a summary of the job status for each
Link
in thisChain
- print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='', recurse_level=2)[source]
Print a summary of the activity done by this
Chain
.
High-level analysis classes
These are Link
sub-classes that implement fermipy
analyses,
or perform tasks related to fermipy
analyses, such as plotting
or collecting results for a set of simulations.
- class fermipy.jobs.target_analysis.AnalyzeROI(**kwargs)[source]
Bases:
Link
Small class that wraps an analysis script.
This particular script does baseline fitting of an ROI.
- Parameters:
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
make_plots (<class 'bool'>) – Make plots [False]
- appname = 'fermipy-analyze-roi'
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>)}
- description = 'Run analysis of a single ROI'
- linkname_default = 'analyze-roi'
- usage = 'fermipy-analyze-roi [options]'
- class fermipy.jobs.target_analysis.AnalyzeSED(**kwargs)[source]
Bases:
Link
Small class to wrap an analysis script.
This particular script fits an SED for a target source with respect to the baseline ROI model.
- Parameters:
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
skydirs (<class 'str'>) – Yaml file with blank sky directions. [None]
profiles (<class 'list'>) – List of profiles to analyze [[]]
make_plots (<class 'bool'>) – Make plots [False]
non_null_src (<class 'bool'>) – Zero out test source [False]
- appname = 'fermipy-analyze-sed'
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'profiles': ([], 'List of profiles to analyze', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'skydirs': (None, 'Yaml file with blank sky directions.', <class 'str'>)}
- description = 'Extract the SED for a single target'
- linkname_default = 'analyze-sed'
- usage = 'fermipy-analyze-sed [options]'
- class fermipy.jobs.target_collect.CollectSED(**kwargs)[source]
Bases:
Link
Small class to collect SED results from a series of simulations.
- Parameters:
sed_file (<class 'str'>) – Path to SED file. [None]
outfile (<class 'str'>) – Path to output file. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
summaryfile (<class 'str'>) – Path to file with results summaries. [None]
nsims (<class 'int'>) – Number of simulations to run. [20]
enumbins (<class 'int'>) – Number of energy bins [12]
seed (<class 'int'>) – Seed number for first simulation. [0]
dry_run (<class 'bool'>) – Print commands but do not run them. [False]
- appname = 'fermipy-collect-sed'
- collist = [{'name': 'e_min', 'unit': 'MeV'}, {'name': 'e_ref', 'unit': 'MeV'}, {'name': 'e_max', 'unit': 'MeV'}, {'name': 'ref_dnde_e_min', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_dnde_e_max', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_dnde', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'ref_flux', 'unit': 'cm-2 ph s-1'}, {'name': 'ref_eflux', 'unit': 'cm-2 MeV s-1'}, {'name': 'ref_npred'}, {'name': 'dnde', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_err', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_errp', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_errn', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'dnde_ul', 'unit': 'cm-2 MeV-1 ph s-1'}, {'name': 'e2dnde', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_err', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_errp', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_errn', 'unit': 'cm-2 MeV s-1'}, {'name': 'e2dnde_ul', 'unit': 'cm-2 MeV s-1'}, {'name': 'norm'}, {'name': 'norm_err'}, {'name': 'norm_errp'}, {'name': 'norm_errn'}, {'name': 'norm_ul'}, {'name': 'ts'}]
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'dry_run': (False, 'Print commands but do not run them.', <class 'bool'>), 'enumbins': (12, 'Number of energy bins', <class 'int'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'outfile': (None, 'Path to output file.', <class 'str'>), 'sed_file': (None, 'Path to SED file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'summaryfile': (None, 'Path to file with results summaries.', <class 'str'>)}
- description = 'Collect SED results from simulations'
- linkname_default = 'collect-sed'
- usage = 'fermipy-collect-sed [options]'
- class fermipy.jobs.target_sim.CopyBaseROI(**kwargs)[source]
Bases:
Link
- Small class to copy a baseline ROI to a simulation area
This is useful for parallelizing analysis using the fermipy.jobs module.
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
target (<class 'str'>) – Name of analysis target. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
extracopy (<class 'list'>) – Extra files to copy [[]]
sim (<class 'str'>) – Name of the simulation scenario. [None]
- appname = 'fermipy-copy-base-roi'
- classmethod copy_analysis_files(orig_dir, dest_dir, copyfiles)[source]
Copy a list of files from orig_dir to dest_dir
- classmethod copy_target_dir(orig_dir, dest_dir, roi_baseline, extracopy)[source]
Create and populate directoris for target analysis
- copyfiles = ['srcmap_*.fits', 'ccube.fits', 'ccube_*.fits']
- default_options = {'extracopy': ([], 'Extra files to copy', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'target': (None, 'Name of analysis target.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Copy a baseline ROI to a simulation area'
- linkname_default = 'copy-base-roi'
- usage = 'fermipy-copy-base-roi [options]'
- class fermipy.jobs.target_sim.RandomDirGen(**kwargs)[source]
Bases:
Link
- Small class to generate random sky directions inside an ROI
This is useful for parallelizing analysis using the fermipy.jobs module.
- Parameters:
config (<class 'str'>) – Path to fermipy config file. [None]
rand_config (<class 'str'>) – Path to config file for genaration random sky dirs [None]
outfile (<class 'str'>) – Path to output file. [None]
- appname = 'fermipy-random-dir-gen'
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'outfile': (None, 'Path to output file.', <class 'str'>), 'rand_config': (None, 'Path to config file for genaration random sky dirs', <class 'str'>)}
- description = 'Generate random sky directions in an ROI'
- linkname_default = 'random-dir-gen'
- usage = 'fermipy-random-dir-gen [options]'
- class fermipy.jobs.target_sim.SimulateROI(**kwargs)[source]
Bases:
Link
- Small class wrap an analysis script.
This is useful for parallelizing analysis using the fermipy.jobs module.
- Parameters:
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
profiles (<class 'list'>) – List of profiles to analyze [[]]
non_null_src (<class 'bool'>) – Zero out test source [False]
do_find_src (<class 'bool'>) – Add source finding step to simulated realizations [False]
sim_profile (<class 'str'>) – Name of the profile to use for simulation. [default]
sim (<class 'str'>) – Name of the simulation scenario. [None]
nsims (<class 'int'>) – Number of simulations to run. [20]
seed (<class 'int'>) – Seed number for first simulation. [0]
- appname = 'fermipy-simulate-roi'
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'do_find_src': (False, 'Add source finding step to simulated realizations', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'profiles': ([], 'List of profiles to analyze', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'sim_profile': ('default', 'Name of the profile to use for simulation.', <class 'str'>)}
- description = 'Run simulated analysis of a single ROI'
- linkname_default = 'simulate-roi'
- usage = 'fermipy-simulate-roi [options]'
- class fermipy.jobs.target_plotting.PlotCastro(**kwargs)[source]
Bases:
Link
Small class to plot an SED as a ‘Castro’ plot.
- Parameters:
infile (<class 'str'>) – Path to input file. [None]
outfile (<class 'str'>) – Path to output file. [None]
- appname = 'fermipy-plot-castro'
- default_options = {'infile': (None, 'Path to input file.', <class 'str'>), 'outfile': (None, 'Path to output file.', <class 'str'>)}
- description = 'Plot likelihood v. flux normalization and energy'
- linkname_default = 'plot-castro'
- usage = 'fermipy-plot-castro [options]'
High-level analysis job dispatch
These are ScatterGather
sub-classes that invoke the Link
sub-classes
listed above.
- class fermipy.jobs.target_analysis.AnalyzeROI_SG(link, **kwargs)[source]
Bases:
ScatterGather
Small class to generate configurations for the
AnalyzeROI
class.This loops over all the targets defined in the target list.
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
make_plots (<class 'bool'>) – Make plots [False]
- appname = 'fermipy-analyze-roi-sg'
- clientclass
alias of
AnalyzeROI
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Run analyses on a series of ROIs'
- job_time = 1500
- usage = 'fermipy-analyze-roi-sg [options]'
- class fermipy.jobs.target_analysis.AnalyzeSED_SG(link, **kwargs)[source]
Bases:
ScatterGather
Small class to generate configurations for this script
This loops over all the targets defined in the target list, and over all the profiles defined for each target.
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
skydirs (<class 'str'>) – Yaml file with blank sky directions. [None]
make_plots (<class 'bool'>) – Make plots [False]
non_null_src (<class 'bool'>) – Zero out test source [False]
- appname = 'fermipy-analyze-sed-sg'
- clientclass
alias of
AnalyzeSED
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'make_plots': (False, 'Make plots', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'skydirs': (None, 'Yaml file with blank sky directions.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Run analyses on a series of ROIs'
- job_time = 1500
- usage = 'fermipy-analyze-sed-sg [options]'
- class fermipy.jobs.target_collect.CollectSED_SG(link, **kwargs)[source]
Bases:
ScatterGather
Small class to generate configurations for
CollectSED
This loops over all the targets defined in the target list
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
sim (<class 'str'>) – Name of the simulation scenario. [None]
nsims (<class 'int'>) – Number of simulations to run. [20]
seed (<class 'int'>) – Seed number for first simulation. [0]
write_full (<class 'bool'>) – Write file with full collected results [False]
write_summary (<class 'bool'>) – Write file with summary of collected results [False]
- appname = 'fermipy-collect-sed-sg'
- clientclass
alias of
CollectSED
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>), 'write_full': (False, 'Write file with full collected results', <class 'bool'>), 'write_summary': (False, 'Write file with summary of collected results', <class 'bool'>)}
- description = 'Collect SED data from a set of simulations for a series of ROIs'
- job_time = 120
- usage = 'fermipy-collect-sed-sg [options]'
- class fermipy.jobs.target_sim.CopyBaseROI_SG(link, **kwargs)[source]
Bases:
ScatterGather
- Small class to generate configurations for this script
This adds the following arguments:
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
sim (<class 'str'>) – Name of the simulation scenario. [None]
extracopy (<class 'list'>) – Extra files to copy [[]]
- appname = 'fermipy-copy-base-roi-sg'
- clientclass
alias of
CopyBaseROI
- default_options = {'extracopy': ([], 'Extra files to copy', <class 'list'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Run analyses on a series of ROIs'
- job_time = 60
- usage = 'fermipy-copy-base-roi-sg [options]'
- class fermipy.jobs.target_sim.RandomDirGen_SG(link, **kwargs)[source]
Bases:
ScatterGather
- Small class to generate configurations for this script
This adds the following arguments:
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
rand_config (<class 'str'>) – Path to config file for genaration random sky dirs [None]
sim (<class 'str'>) – Name of the simulation scenario. [None]
- appname = 'fermipy-random-dir-gen-sg'
- clientclass
alias of
RandomDirGen
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'rand_config': (None, 'Path to config file for genaration random sky dirs', <class 'str'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Run analyses on a series of ROIs'
- job_time = 60
- usage = 'fermipy-random-dir-gen-sg [options]'
- class fermipy.jobs.target_sim.SimulateROI_SG(link, **kwargs)[source]
Bases:
ScatterGather
- Small class to generate configurations for this script
This adds the following arguments:
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
config (<class 'str'>) – Path to fermipy config file. [None]
roi_baseline (<class 'str'>) – Key for roi baseline file. [fit_baseline]
non_null_src (<class 'bool'>) – Zero out test source [False]
do_find_src (<class 'bool'>) – Add source finding step to simulated realizations [False]
sim (<class 'str'>) – Name of the simulation scenario. [None]
sim_profile (<class 'str'>) – Name of the profile to use for simulation. [default]
nsims (<class 'int'>) – Number of simulations to run. [20]
seed (<class 'int'>) – Seed number for first simulation. [0]
nsims_job (<class 'int'>) – Number of simulations to run per job. [0]
- appname = 'fermipy-simulate-roi-sg'
- clientclass
alias of
SimulateROI
- default_options = {'config': (None, 'Path to fermipy config file.', <class 'str'>), 'do_find_src': (False, 'Add source finding step to simulated realizations', <class 'bool'>), 'non_null_src': (False, 'Zero out test source', <class 'bool'>), 'nsims': (20, 'Number of simulations to run.', <class 'int'>), 'nsims_job': (0, 'Number of simulations to run per job.', <class 'int'>), 'roi_baseline': ('fit_baseline', 'Key for roi baseline file.', <class 'str'>), 'seed': (0, 'Seed number for first simulation.', <class 'int'>), 'sim': (None, 'Name of the simulation scenario.', <class 'str'>), 'sim_profile': ('default', 'Name of the profile to use for simulation.', <class 'str'>), 'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Run analyses on a series of ROIs'
- job_time = 1500
- usage = 'fermipy-simulate-roi-sg [options]'
- class fermipy.jobs.target_plotting.PlotCastro_SG(link, **kwargs)[source]
Bases:
ScatterGather
Small class to generate configurations for the
PlotCastro
class.This loops over all the targets defined in the target list.
- Parameters:
ttype (<class 'str'>) – Type of target being analyzed. [None]
targetlist (<class 'str'>) – Path to the target list. [None]
- appname = 'fermipy-plot-castro-sg'
- clientclass
alias of
PlotCastro
- default_options = {'targetlist': (None, 'Path to the target list.', <class 'str'>), 'ttype': (None, 'Type of target being analyzed.', <class 'str'>)}
- description = 'Make castro plots for set of targets'
- job_time = 60
- usage = 'fermipy-plot-castro-sg [options]'
Batch and System Interfaces
Abstract interface for interactions with system for launching jobs.
- class fermipy.jobs.sys_interface.SysInterface(**kwargs)[source]
Bases:
object
Base class to handle job dispatching interface
- clean_jobs(link, job_dict=None, clean_all=False)[source]
Clean up all the jobs associated with this link.
Returns a
JobStatus
enum
- dispatch_job(link, key, job_archive, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]
Function to dispatch a single job
- Parameters:
link (
Link
) – Link object that sendes the jobkey (str) – Key used to identify this particular job
job_archive (
JobArchive
) – Archive used to keep track of jobsobject (Returns JobDetails)
- dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]
Hook to dispatch a single job
- string_exited = 'Exited with exit code'
- string_successful = 'Successfully completed'
C’tor
- fermipy.jobs.sys_interface.check_log(logfile, exited='Exited with exit code', successful='Successfully completed')[source]
Check a log file to determine status of LSF job
Often logfile doesn’t exist because the job hasn’t begun to run. It is unclear what you want to do in that case…
- fermipy.jobs.sys_interface.clean_job(logfile, outfiles, dry_run=False)[source]
Removes log file and files created by failed jobs.
If dry_run is True, print name of files to be removed, but do not remove them.
- fermipy.jobs.sys_interface.remove_file(filepath, dry_run=False)[source]
Remove the file at filepath
Catches exception if the file does not exist.
If dry_run is True, print name of file to be removed, but do not remove it.
Implementation of ScatterGather
class for dealing with LSF batch jobs
- class fermipy.jobs.native_impl.NativeInterface(**kwargs)[source]
Bases:
SysInterface
Implmentation of ScatterGather that uses the native system
- dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]
Send a single job to be executed
- Parameters:
link (
fermipy.jobs.chain.Link
) – The link used to invoke the command we are runningkey (str) – A string that identifies this particular instance of the job
job_config (dict) – A dictionrary with the arguments for the job. Used with the self._command_template job template
logfile (str) – The logfile for this job, may be used to check for success/ failure
- string_exited = 'Exited with exit code'
- string_successful = 'Successfully completed'
C’tor
- fermipy.jobs.native_impl.get_native_default_args()[source]
Get the correct set of batch jobs arguments.
Implementation of ScatterGather
interface class for dealing with LSF batch jobs at SLAC
- class fermipy.jobs.slac_impl.SlacInterface(**kwargs)[source]
Bases:
SysInterface
Implmentation of ScatterGather that uses LSF
- dispatch_job_hook(link, key, job_config, logfile, stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>)[source]
Send a single job to the LSF batch
- Parameters:
link (
fermipy.jobs.chain.Link
) – The link used to invoke the command we are runningkey (str) – A string that identifies this particular instance of the job
job_config (dict) – A dictionrary with the arguments for the job. Used with the self._command_template job template
logfile (str) – The logfile for this job, may be used to check for success/ failure
- string_exited = 'Exited with exit code'
- string_successful = 'Successfully completed'
C’tor
- fermipy.jobs.slac_impl.build_bsub_command(command_template, lsf_args)[source]
Build and return a lsf batch command template
- The structure will be ‘bsub -s <key> <value> <command_template>’
where <key> and <value> refer to items in lsf_args
- fermipy.jobs.slac_impl.get_lsf_status()[source]
Count and print the number of jobs in various LSF states
- fermipy.jobs.slac_impl.get_slac_default_args(job_time=1500)[source]
Create a batch job interface object.
- Parameters:
job_time (int) – Expected max length of the job, in seconds. This is used to select the batch queue and set the job_check_sleep parameter that sets how often we check for job completion.
- fermipy.jobs.slac_impl.make_gpfs_path(path)[source]
Make a gpfs version of a file path. This just puts /gpfs at the beginning instead of /nfs
- fermipy.jobs.slac_impl.make_nfs_path(path)[source]
Make a nfs version of a file path. This just puts /nfs at the beginning instead of /gpfs
Factory module to return the default interace to the batch farm
- fermipy.jobs.batch.get_batch_job_args(job_time=1500)[source]
Get the correct set of batch jobs arguments.
- fermipy.jobs.batch.get_batch_job_interface(job_time=1500)[source]
Create a batch job interface object.
- Parameters:
job_time (int) – Expected max length of the job, in seconds. This is used to select the batch queue and set the job_check_sleep parameter that sets how often we check for job completion.
- Returns:
job_interfact – Object that manages interactions with batch farm
- Return type:
SysInterface
File Archive module
Classes and utilites to keep track of files associated to an analysis.
The main class is FileArchive
, which keep track of all the files associated to an analysis.
The FileHandle
helper class encapsulates information on a particular file.
- class fermipy.jobs.file_archive.FileArchive(**kwargs)[source]
Bases:
object
Class that keeps track of the status of files used in an analysis
- Parameters:
table_file (str) – Path to the file used to persist this
FileArchive
table (
astropy.table.Table
) – Persistent representation of thisFileArchive
cache (
OrderedDict
) – Transient representation of thisFileArchive
base_path (str) – Base file path for all files in this
FileArchive
- property base_path
Return the base file path for all files in this
FileArchive
- classmethod build_archive(**kwargs)[source]
Return the singleton
FileArchive
instance, building it if needed
- property cache
Return the transiet representation of this
FileArchive
- classmethod get_archive()[source]
Return the singleton
FileArchive
instance
- get_file_ids(file_list, creator=None, status=0, file_dict=None)[source]
Get or create a list of file ids based on file names
- Parameters:
file_list (list) – The paths to the file
creatror (int) – A unique key for the job that created these files
status (
FileStatus
) – Enumeration giving current status of filesfile_dict (
FileDict
) – Mask giving flags set on this fileintegers (Returns list of)
- get_file_paths(id_list)[source]
Get a list of file paths based of a set of ids
- Parameters:
id_list (list) – List of integer file keys
paths (Returns list of file)
- get_handle(filepath)[source]
Get the
FileHandle
object associated to a particular file
- register_file(filepath, creator, status=0, flags=0)[source]
Register a file in the archive.
If the file already exists, this raises a
KeyError
- Parameters:
filepath (str) – The path to the file
creatror (int) – A unique key for the job that created this file
status (
FileStatus
) – Enumeration giving current status of fileflags (
FileFlags
) – Enumeration giving flags set on this fileFileHandle (Returns)
- property table
Return the persistent representation of this
FileArchive
- property table_file
Return the path to the file used to persist this
FileArchive
- update_file(filepath, creator, status)[source]
Update a file in the archive
If the file does not exists, this raises a
KeyError
- Parameters:
filepath (str) – The path to the file
creatror (int) – A unique key for the job that created this file
status (
FileStatus
) – Enumeration giving current status of fileFileHandle (Returns)
- class fermipy.jobs.file_archive.FileDict(**kwargs)[source]
Bases:
object
Small class to keep track of files used & createed by a link.
- Parameters:
- property chain_input_files
Return a list of the input files needed by this chain.
For
Link
sub-classes this will return only those files that were not created by any internalLink
- property chain_output_files
Return a list of the all the output files produced by this link.
For
Link
sub-classes this will return only those files that were not marked as internal files or marked for removal.
- property gzip_files
Return a list of the files compressed by this link.
This returns all files that were explicitly marked for compression.
- property input_files
Return a list of the input files needed by this link.
For
Link
sub-classes this will return the union of all the input files of each internalLink
.That is to say this will include files produced by one
Link
in aChain
and used as input to anotherLink
in theChain
- property input_files_to_stage
Return a list of the input files needed by this link.
For
Link
sub-classes this will return the union of all the input files of each internalLink
.That is to say this will include files produced by one
Link
in aChain
and used as input to anotherLink
in theChain
- property internal_files
Return a list of the intermediate files produced by this link.
This returns all files that were explicitly marked as internal files.
- property output_files
Return a list of the output files produced by this link.
For
Link
sub-classes this will return the union of all the output files of each internalLink
.That is to say this will include files produced by one
Link
in aChain
and used as input to anotherLink
in theChain
- property output_files_to_stage
Return a list of the input files needed by this link.
For
Link
sub-classes this will return the union of all the input files of each internalLink
.That is to say this will include files produced by one
Link
in aChain
and used as input to anotherLink
in theChain
- print_chain_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='')[source]
Print a summary of the files in this file dict.
This version uses chain_input_files and chain_output_files to count the input and output files.
- print_summary(stream=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='utf-8'>, indent='')[source]
Print a summary of the files in this file dict.
This version explictly counts the union of all input and output files.
- property temp_files
Return a list of the temporary files produced by this link.
This returns all files that were explicitly marked for removal.
- class fermipy.jobs.file_archive.FileFlags[source]
Bases:
object
Bit masks to indicate file types
- gz_mask = 8
- in_ch_mask = 23
- in_stage_mask = 33
- input_mask = 1
- internal_mask = 16
- no_flags = 0
- out_ch_mask = 22
- out_stage_mask = 34
- output_mask = 2
- rm_mask = 4
- rmint_mask = 20
- stageable = 32
- class fermipy.jobs.file_archive.FileHandle(**kwargs)[source]
Bases:
object
Class to keep track of infomration about a file file.
- Parameters:
- append_to_table(table)[source]
Add this instance as a row on a
astropy.table.Table
- classmethod create_from_row(table_row)[source]
Build and return a
FileHandle
from anastropy.table.row.Row
- classmethod make_dict(table)[source]
Build and return a dict of
FileHandle
from anastropy.table.Table
The dictionary is keyed by FileHandle.key, which is a unique integer for each file
- static make_table(file_dict)[source]
Build and return an
astropy.table.Table
to storeFileHandle
- update_table_row(table, row_idx)[source]
Update the values in an
astropy.table.Table
for this instances
- class fermipy.jobs.file_archive.FileStageManager(scratchdir, workdir)[source]
Bases:
object
Small class to deal with staging files to and from a scratch area
- construct_scratch_path(dirname, basename)[source]
Construct and return a path in the scratch area.
This will be <self.scratchdir>/<dirname>/<basename>
- get_scratch_path(local_file)[source]
Construct and return a path in the scratch area from a local file.
- static make_scratch_dirs(file_mapping, dry_run=True)[source]
Make any directories need in the scratch area
- map_files(local_files)[source]
Build a dictionary mapping local paths to scratch paths.
- Parameters:
local_files (list) – List of filenames to be mapped to scratch area
dict (Returns) – Mapping local_file : fullpath of scratch file
- split_local_path(local_file)[source]
Split the local path into a directory name and a file name
If local_file is in self.workdir or a subdirectory of it, the directory will consist of the relative path from workdir.
If local_file is not in self.workdir, directory will be empty.
Returns (dirname, basename)
- class fermipy.jobs.file_archive.FileStatus[source]
Bases:
object
Enumeration of file status types
- exists = 2
- expected = 1
- missing = 3
- no_file = 0
- superseded = 4
- temp_removed = 5
Job Archive module
Classes and utilites to keep track the various jobs that are running in an analysis pipeline.
The main class is JobArchive
, which keep track of all the jobs associated to an analysis.
The JobDetails
helper class encapsulates information on a instance of running a job.
- class fermipy.jobs.job_archive.JobArchive(**kwargs)[source]
Bases:
object
Class that keeps of all the jobs associated to an analysis.
- Parameters:
table_file (str) – Path to the file used to persist this
JobArchive
table (
astropy.table.Table
) – Persistent representation of thisJobArchive
table_ids (
astropy.table.Table
) – Ancillary table with information about file idsfile_archive (
FileArchive
) – Archive with infomation about all this files used and produced by this analysis
- classmethod build_archive(**kwargs)[source]
Return the singleton
JobArchive
instance, building it if needed
- classmethod build_temp_job_archive()[source]
Build and return a
JobArchive
using defualt locations of persistent files.
- property cache
Return the transiet representation of this
JobArchive
- property file_archive
Return the
FileArchive
with infomation about all the files used and produced by this analysis
- classmethod get_archive()[source]
Return the singleton
JobArchive
instance
- get_details(jobname, jobkey)[source]
Get the
JobDetails
associated to a particular job instance
- make_job_details(row_idx)[source]
Create a
JobDetails
from anastropy.table.row.Row
- register_job(job_details)[source]
Register a job in this
JobArchive
- register_job_from_link(link, key, **kwargs)[source]
Register a job in the
JobArchive
from aLink
object
- property table
Return the persistent representation of this
JobArchive
- property table_file
Return the path to the file used to persist this
JobArchive
- property table_ids
Return the rpersistent epresentation of the ancillary info of this
JobArchive
- update_job(job_details)[source]
Update a job in the
JobArchive
- class fermipy.jobs.job_archive.JobDetails(**kwargs)[source]
Bases:
object
A simple structure to keep track of the details of each of the sub-proccess jobs.
- Parameters:
dbkey (int) – A unique key to identify this job
jobname (str) – A name used to idenfity this job
jobkey (str) – A string to identify this instance of the job
appname (str) – The executable inovked to run the job
logfile (str) – The logfile for this job, may be used to check for success/ failure
job_config (dict) – A dictionrary with the arguments for the job
parent_id (int) – Unique key identifying the parent job
infile_ids (list of int) – Keys to identify input files to this job
outfile_ids (list of int) – Keys to identify output files from this job
rmfile_ids (list of int) – Keys to identify temporary files removed by this job
status (int) – Current job status, one of the enums above
- append_to_tables(table, table_ids)[source]
Add this instance as a row on a
astropy.table.Table
- check_status_logfile(checker_func)[source]
Check on the status of this particular job using the logfile
- classmethod create_from_row(table_row)[source]
Create a
JobDetails
from anastropy.table.row.Row
- property fullkey
Return the fullkey for this job fullkey = <jobkey>@<jobname>
- get_file_ids(file_archive, creator=None, status=0)[source]
Fill the file id arrays from the file lists
- Parameters:
file_archive (
FileArchive
) – Used to look up file idscreator (int) – A unique key for the job that created these file
status (
FileStatus
) – Enumeration giving current status thse files
- get_file_paths(file_archive, file_id_array)[source]
Get the full paths of the files used by this object from the the id arrays
- Parameters:
file_archive (
FileArchive
) – Used to look up file idsfile_id_array (
numpy.array
) – Array that remaps the file indexes
- classmethod make_dict(table)[source]
Build a dictionary map int to
JobDetails
from anastropy.table.Table
- static make_fullkey(jobname, jobkey='__top__')[source]
Combine jobname and jobkey to make a unique key fullkey = <jobkey>@<jobname>
- static split_fullkey(fullkey)[source]
Split fullkey to make extract jobname, jobkey fullkey = <jobkey>@<jobname>
- topkey = '__top__'
- update_table_row(table, row_idx)[source]
Add this instance as a row on a
astropy.table.Table
- class fermipy.jobs.job_archive.JobStatus[source]
Bases:
object
Enumeration of job status types
- done = 5
- failed = 6
- no_job = -1
- not_ready = 1
- partial_failed = 7
- pending = 3
- ready = 2
- removed = 8
- running = 4
- unknown = 0
- class fermipy.jobs.job_archive.JobStatusVector[source]
Bases:
object
Vector that counts the status of jobs and returns an overall status flag based on those
- property n_done
Return the number of successfully completed jobs
- property n_failed
Return the number of failed jobs
- property n_pending
Return the number jobs submitted to batch, but not yet running
- property n_running
Return the number of running jobs
- property n_total
Return the total number of jobs
- property n_waiting
Return the number of jobs in various waiting states