# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Prepare data for diffuse all-sky analysis
"""
from __future__ import absolute_import, division, print_function
import os
import math
import yaml
from fermipy.jobs.utils import is_null
from fermipy.jobs.chain import Link
from fermipy.jobs.chain import Chain
from fermipy.jobs.scatter_gather import ScatterGather
from fermipy.jobs.slac_impl import make_nfs_path
from fermipy.diffuse.utils import create_inputlist
from fermipy.diffuse.name_policy import NameFactory
from fermipy.diffuse import defaults as diffuse_defaults
from fermipy.diffuse.binning import EVT_TYPE_DICT
from fermipy.diffuse.timefilter import MktimeFilterDict
from fermipy.diffuse.job_library import Gtlink_select, Gtlink_mktime,\
Gtlink_ltcube, Gtlink_bin, Gtltsum_SG, Gtexpcube2_SG
from fermipy.diffuse.gt_coadd_split import CoaddSplit_SG
NAME_FACTORY = NameFactory()
try:
MKTIME_DICT = MktimeFilterDict.build_from_yamlfile('config/mktime_filters.yaml')
except IOError:
MKTIME_DICT = MktimeFilterDict(aliases=dict(quality='lat_config==1&&data_qual>0'),
selections=dict(standard='{quality}'))
def make_full_path(basedir, outkey, origname):
"""Make a full file path by combining tokens
Parameters
-----------
basedir : str
The top level output area
outkey : str
The key for the particular instance of the analysis
origname : str
Template for the output file name
Returns
-------
outpath : str
This will be <basedir>:<outkey>:<newname>.fits
Where newname = origname.replace('.fits', '_<outkey>.fits')
"""
return os.path.join(basedir, outkey,
os.path.basename(origname).replace('.fits',
'_%s.fits' % outkey))
[docs]class SplitAndMktime(Chain):
"""Small class to split, apply mktime and bin data according to some user-provided specification
This chain consists multiple `Link` objects:
select-energy-EBIN-ZCUT : `Gtlink_select`
Initial splitting by energy bin and zenith angle cut
mktime-EBIN-ZCUT-FILTER : `Gtlink_mktime`
Application of gtmktime filter for zenith angle cut
ltcube-EBIN-ZCUT-FILTER : `Gtlink_ltcube`
Computation of livetime cube for zenith angle cut
select-type-EBIN-ZCUT-FILTER-TYPE : `Gtlink_select`
Refinement of selection from event types
bin-EBIN-ZCUT-FILTER-TYPE : `Gtlink_bin`
Final binning of the data for each event type
"""
appname = 'fermipy-split-and-mktime'
linkname_default = 'split-and-mktime'
usage = '%s [options]' % (appname)
description = 'Run gtselect and gtbin together'
default_options = dict(comp=diffuse_defaults.diffuse['comp'],
data=diffuse_defaults.diffuse['data'],
hpx_order_max=diffuse_defaults.diffuse['hpx_order_ccube'],
ft1file=diffuse_defaults.diffuse['ft1file'],
ft2file=diffuse_defaults.diffuse['ft2file'],
evclass=(128, 'Event class bit mask', int),
outdir=('counts_cubes', 'Output directory', str),
outkey=(None, 'Key for this particular output file', str),
pfiles=(None, 'Directory for .par files', str),
do_ltsum=(False, 'Sum livetime cube files', bool),
scratch=(None, 'Scratch area', str),
dry_run=(False, 'Print commands but do not run them', bool))
__doc__ += Link.construct_docstring(default_options)
def __init__(self, **kwargs):
"""C'tor
"""
super(SplitAndMktime, self).__init__(**kwargs)
self.comp_dict = None
def _map_arguments(self, args):
"""Map from the top-level arguments to the arguments provided to
the indiviudal links """
comp_file = args.get('comp', None)
datafile = args.get('data', None)
if is_null(comp_file):
return
if is_null(datafile):
return
NAME_FACTORY.update_base_dict(args['data'])
outdir = args.get('outdir')
outkey = args.get('outkey')
ft1file = args['ft1file']
ft2file = args['ft2file']
if is_null(outdir) or is_null(outkey):
return
pfiles = os.path.join(outdir, outkey)
self.comp_dict = yaml.safe_load(open(comp_file))
coordsys = self.comp_dict.pop('coordsys')
full_out_dir = make_nfs_path(os.path.join(outdir, outkey))
for key_e, comp_e in sorted(self.comp_dict.items()):
emin = math.pow(10., comp_e['log_emin'])
emax = math.pow(10., comp_e['log_emax'])
enumbins = comp_e['enumbins']
zmax = comp_e['zmax']
zcut = "zmax%i" % comp_e['zmax']
evclassstr = NAME_FACTORY.base_dict['evclass']
kwargs_select = dict(zcut=zcut,
ebin=key_e,
psftype='ALL',
coordsys=coordsys)
linkname = 'select-energy-%s-%s' % (key_e, zcut)
selectfile_energy = make_full_path(outdir, outkey, NAME_FACTORY.select(**kwargs_select))
self._set_link(linkname, Gtlink_select,
infile=ft1file,
outfile=selectfile_energy,
zmax=zmax,
emin=emin,
emax=emax,
evclass=NAME_FACTORY.evclassmask(evclassstr),
pfiles=pfiles,
logfile=os.path.join(full_out_dir, "%s.log" % linkname))
if 'mktimefilters' in comp_e:
mktimefilters = comp_e['mktimefilters']
else:
mktimefilters = ['none']
for mktimekey in mktimefilters:
kwargs_mktime = kwargs_select.copy()
kwargs_mktime['mktime'] = mktimekey
filterstring = MKTIME_DICT[mktimekey]
mktime_file = make_full_path(outdir, outkey, NAME_FACTORY.mktime(**kwargs_mktime))
ltcube_file = make_full_path(outdir, outkey, NAME_FACTORY.ltcube(**kwargs_mktime))
linkname_mktime = 'mktime-%s-%s-%s' % (key_e, zcut, mktimekey)
linkname_ltcube = 'ltcube-%s-%s-%s' % (key_e, zcut, mktimekey)
self._set_link(linkname_mktime, Gtlink_mktime,
evfile=selectfile_energy,
outfile=mktime_file,
scfile=ft2file,
filter=filterstring,
pfiles=pfiles,
logfile=os.path.join(full_out_dir, "%s.log" % linkname_mktime))
self._set_link(linkname_ltcube, Gtlink_ltcube,
evfile=mktime_file,
outfile=ltcube_file,
scfile=ft2file,
zmax=zmax,
pfiles=pfiles,
logfile=os.path.join(full_out_dir, "%s.log" % linkname_ltcube))
if 'evtclasses' in comp_e:
evtclasslist_vals = comp_e['evtclasses']
else:
evtclasslist_vals = [NAME_FACTORY.base_dict['evclass']]
for evtclassval in evtclasslist_vals:
for psf_type, psf_dict in sorted(comp_e['psf_types'].items()):
linkname_select = 'select-type-%s-%s-%s-%s-%s' % (
key_e, zcut, mktimekey, evtclassval, psf_type)
linkname_bin = 'bin-%s-%s-%s-%s-%s' % (key_e,
zcut, mktimekey,
evtclassval, psf_type)
kwargs_bin = kwargs_mktime.copy()
kwargs_bin['psftype'] = psf_type
kwargs_bin['coordsys'] = coordsys
kwargs_bin['evclass'] = evtclassval
selectfile_psf = make_full_path(
outdir, outkey, NAME_FACTORY.select(**kwargs_bin))
binfile_psf = make_full_path(
outdir, outkey, NAME_FACTORY.ccube(**kwargs_bin))
hpx_order_psf = min(args['hpx_order_max'], psf_dict['hpx_order'])
linkname_select = 'select-type-%s-%s-%s-%s-%s' % (key_e, zcut,
mktimekey, evtclassval,
psf_type)
linkname_bin = 'bin-%s-%s-%s-%s-%s' % (key_e, zcut, mktimekey,
evtclassval, psf_type)
self._set_link(linkname_select, Gtlink_select,
infile=selectfile_energy,
outfile=selectfile_psf,
zmax=zmax,
emin=emin,
emax=emax,
evtype=EVT_TYPE_DICT[psf_type],
evclass=NAME_FACTORY.evclassmask(evtclassval),
pfiles=pfiles,
logfile=os.path.join(full_out_dir, "%s.log" % linkname_select))
self._set_link(linkname_bin, Gtlink_bin,
coordsys=coordsys,
hpx_order=hpx_order_psf,
evfile=selectfile_psf,
outfile=binfile_psf,
emin=emin,
emax=emax,
enumbins=enumbins,
pfiles=pfiles,
logfile=os.path.join(full_out_dir, "%s.log" % linkname_bin))
[docs]class SplitAndMktime_SG(ScatterGather):
"""Small class to generate configurations for SplitAndMktime
"""
appname = 'fermipy-split-and-mktime-sg'
usage = "%s [options]" % (appname)
description = "Prepare data for diffuse all-sky analysis"
clientclass = SplitAndMktime
job_time = 1500
default_options = dict(comp=diffuse_defaults.diffuse['comp'],
data=diffuse_defaults.diffuse['data'],
hpx_order_max=diffuse_defaults.diffuse['hpx_order_ccube'],
ft1file=diffuse_defaults.diffuse['ft1file'],
ft2file=diffuse_defaults.diffuse['ft2file'],
do_ltsum=diffuse_defaults.diffuse['do_ltsum'],
scratch=diffuse_defaults.diffuse['scratch'],
dry_run=diffuse_defaults.diffuse['dry_run'])
__doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args):
"""Hook to build job configurations
"""
job_configs = {}
comp_file = args.get('comp', None)
if comp_file is not None:
comp_dict = yaml.safe_load(open(comp_file))
coordsys = comp_dict.pop('coordsys')
for v in comp_dict.values():
v['coordsys'] = coordsys
else:
return job_configs
datafile = args['data']
if datafile is None or datafile == 'None':
return job_configs
NAME_FACTORY.update_base_dict(args['data'])
inputfiles = create_inputlist(args['ft1file'])
outdir_base = os.path.join(NAME_FACTORY.base_dict['basedir'], 'counts_cubes')
data_ver = NAME_FACTORY.base_dict['data_ver']
for idx, infile in enumerate(inputfiles):
key = "%06i" % idx
key_scfile = "%03i" % (idx + 1)
output_dir = os.path.join(outdir_base, key)
try:
os.mkdir(output_dir)
except OSError:
pass
scfile = args['ft2file'].replace('.lst', '_%s.fits' % key_scfile)
logfile = make_nfs_path(os.path.join(output_dir,
'scatter_mk_%s_%s.log' % (data_ver, key)))
job_configs[key] = comp_dict.copy()
job_configs[key].update(dict(ft1file=infile,
scfile=scfile,
comp=args['comp'],
hpx_order_max=args['hpx_order_max'],
outdir=outdir_base,
outkey=key,
logfile=logfile,
pfiles=output_dir))
return job_configs
[docs]class SplitAndMktimeChain(Chain):
"""Chain to run split and mktime and then make livetime and exposure cubes
This chain consists of:
split-and-mktime : `SplitAndMkTime_SG`
Chain to make the binned counts maps for each input file
coadd-split : `CoaddSplit_SG`
Link to co-add the binnec counts maps files
ltsum : `Gtltsum_SG`
Link to co-add the livetime cube files
expcube2 : `Gtexpcube2_SG`
Link to make the corresponding binned exposure maps
"""
appname = 'fermipy-split-and-mktime-chain'
linkname_default = 'split-and-mktime-chain'
usage = '%s [options]' % (appname)
description = 'Run split-and-mktime, coadd-split and exposure'
default_options = dict(data=diffuse_defaults.diffuse['data'],
comp=diffuse_defaults.diffuse['comp'],
ft1file=diffuse_defaults.diffuse['ft1file'],
ft2file=diffuse_defaults.diffuse['ft2file'],
hpx_order_ccube=diffuse_defaults.diffuse['hpx_order_ccube'],
hpx_order_expcube=diffuse_defaults.diffuse['hpx_order_expcube'],
do_ltsum=diffuse_defaults.diffuse['do_ltsum'],
scratch=diffuse_defaults.diffuse['scratch'],
dry_run=diffuse_defaults.diffuse['dry_run'])
__doc__ += Link.construct_docstring(default_options)
def __init__(self, **kwargs):
"""C'tor
"""
super(SplitAndMktimeChain, self).__init__(**kwargs)
self.comp_dict = None
def _map_arguments(self, args):
"""Map from the top-level arguments to the arguments provided to
the indiviudal links """
data = args.get('data')
comp = args.get('comp')
ft1file = args.get('ft1file')
ft2file = args.get('ft2file')
scratch = args.get('scratch', None)
dry_run = args.get('dry_run', None)
self._set_link('split-and-mktime', SplitAndMktime_SG,
comp=comp, data=data,
hpx_order_max=args.get('hpx_order_ccube', 9),
ft1file=ft1file,
ft2file=ft2file,
do_ltsum=args.get('do_ltsum', False),
scratch=scratch,
dry_run=dry_run)
self._set_link('coadd-split', CoaddSplit_SG,
comp=comp, data=data,
ft1file=ft1file)
self._set_link('ltsum', Gtltsum_SG,
comp=comp, data=data,
ft1file=args['ft1file'],
dry_run=dry_run)
self._set_link('expcube2', Gtexpcube2_SG,
comp=comp, data=data,
hpx_order_max=args.get('hpx_order_expcube', 5),
dry_run=dry_run)
def register_classes():
"""Register these classes with the `LinkFactory` """
SplitAndMktime.register_class()
SplitAndMktime_SG.register_class()
SplitAndMktimeChain.register_class()