Source code for fermipy.diffuse.gt_split_and_bin

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Prepare data for diffuse all-sky analysis
"""
from __future__ import absolute_import, division, print_function

import os
import math

import yaml

from fermipy.jobs.utils import is_null
from fermipy.jobs.link import Link
from fermipy.jobs.chain import Chain
from fermipy.jobs.scatter_gather import ScatterGather
from fermipy.jobs.slac_impl import make_nfs_path

from fermipy.diffuse.utils import create_inputlist
from fermipy.diffuse.name_policy import NameFactory
from fermipy.diffuse import defaults as diffuse_defaults
from fermipy.diffuse.binning import EVT_TYPE_DICT

from fermipy.diffuse.job_library import Gtlink_select,\
    Gtlink_bin, Gtexpcube2_SG

from fermipy.diffuse.gt_coadd_split import CoaddSplit_SG


NAME_FACTORY = NameFactory()


def make_full_path(basedir, outkey, origname):
    """Make a full file path"""
    return os.path.join(basedir, outkey, os.path.basename(
        origname).replace('.fits', '_%s.fits' % outkey))


[docs]class SplitAndBin(Chain): """Small class to split and bin data according to some user-provided specification This chain consists multiple `Link` objects: select-energy-EBIN-ZCUT : `Gtlink_select` Initial splitting by energy bin and zenith angle cut select-type-EBIN-ZCUT-FILTER-TYPE : `Gtlink_select` Refinement of selection from event types bin-EBIN-ZCUT-FILTER-TYPE : `Gtlink_bin` Final binning of the data for each event type """ appname = 'fermipy-split-and-bin' linkname_default = 'split-and-bin' usage = '%s [options]' % (appname) description = 'Run gtselect and gtbin together' default_options = dict(data=diffuse_defaults.diffuse['data'], comp=diffuse_defaults.diffuse['comp'], hpx_order_max=diffuse_defaults.diffuse['hpx_order_ccube'], ft1file=(None, 'Input FT1 file', str), evclass=(128, 'Event class bit mask', int), outdir=('counts_cubes_cr', 'Base name for output files', str), outkey=(None, 'Key for this particular output file', str), pfiles=(None, 'Directory for .par files', str), scratch=(None, 'Scratch area', str), dry_run=(False, 'Print commands but do not run them', bool)) __doc__ += Link.construct_docstring(default_options) def __init__(self, **kwargs): """C'tor """ super(SplitAndBin, self).__init__(**kwargs) self.comp_dict = None def _map_arguments(self, args): """Map from the top-level arguments to the arguments provided to the indiviudal links """ comp_file = args.get('comp', None) datafile = args.get('data', None) if is_null(comp_file): return if is_null(datafile): return NAME_FACTORY.update_base_dict(datafile) outdir = args.get('outdir', None) outkey = args.get('outkey', None) ft1file = args['ft1file'] if is_null(outdir) or is_null(outkey): return pfiles = os.path.join(outdir, outkey) self.comp_dict = yaml.safe_load(open(comp_file)) coordsys = self.comp_dict.pop('coordsys') full_out_dir = make_nfs_path(os.path.join(outdir, outkey)) for key_e, comp_e in sorted(self.comp_dict.items()): emin = math.pow(10., comp_e['log_emin']) emax = math.pow(10., comp_e['log_emax']) enumbins = comp_e['enumbins'] zmax = comp_e['zmax'] zcut = "zmax%i" % comp_e['zmax'] evclassstr = NAME_FACTORY.base_dict['evclass'] kwargs_select = dict(zcut=zcut, ebin=key_e, psftype='ALL', coordsys=coordsys, mktime='none') selectfile_energy = make_full_path(outdir, outkey, NAME_FACTORY.select(**kwargs_select)) linkname = 'select-energy-%s-%s' % (key_e, zcut) self._set_link(linkname, Gtlink_select, infile=ft1file, outfile=selectfile_energy, zmax=zmax, emin=emin, emax=emax, evclass=NAME_FACTORY.evclassmask(evclassstr), pfiles=pfiles, logfile=os.path.join(full_out_dir, "%s.log" % linkname)) if 'evtclasses' in comp_e: evtclasslist_vals = comp_e['evtclasses'] else: evtclasslist_vals = [NAME_FACTORY.base_dict['evclass']] for evtclassval in evtclasslist_vals: for psf_type, psf_dict in sorted(comp_e['psf_types'].items()): linkname_select = 'select-type-%s-%s-%s-%s' % ( key_e, zcut, evtclassval, psf_type) linkname_bin = 'bin-%s-%s-%s-%s' % (key_e, zcut, evtclassval, psf_type) hpx_order = psf_dict['hpx_order'] kwargs_bin = kwargs_select.copy() kwargs_bin['psftype'] = psf_type selectfile_psf = make_full_path( outdir, outkey, NAME_FACTORY.select(**kwargs_bin)) binfile = make_full_path(outdir, outkey, NAME_FACTORY.ccube(**kwargs_bin)) self._set_link(linkname_select, Gtlink_select, infile=selectfile_energy, outfile=selectfile_psf, zmax=zmax, emin=emin, emax=emax, evtype=EVT_TYPE_DICT[psf_type], evclass=NAME_FACTORY.evclassmask(evtclassval), pfiles=pfiles, logfile=os.path.join(full_out_dir, "%s.log" % linkname_select)) self._set_link(linkname_bin, Gtlink_bin, coordsys=coordsys, hpx_order=hpx_order, evfile=selectfile_psf, outfile=binfile, emin=emin, emax=emax, enumbins=enumbins, pfiles=pfiles, logfile=os.path.join(full_out_dir, "%s.log" % linkname_bin))
[docs]class SplitAndBin_SG(ScatterGather): """Small class to generate configurations for SplitAndBin """ appname = 'fermipy-split-and-bin-sg' usage = "%s [options]" % (appname) description = "Prepare data for diffuse all-sky analysis" clientclass = SplitAndBin job_time = 1500 default_options = dict(comp=diffuse_defaults.diffuse['comp'], data=diffuse_defaults.diffuse['data'], hpx_order_max=diffuse_defaults.diffuse['hpx_order_ccube'], ft1file=(None, 'Input FT1 file', str), scratch=(None, 'Path to scratch area', str)) __doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args): """Hook to build job configurations """ job_configs = {} comp_file = args.get('comp', None) if comp_file is not None: comp_dict = yaml.safe_load(open(comp_file)) coordsys = comp_dict.pop('coordsys') for v in comp_dict.values(): v['coordsys'] = coordsys else: return job_configs NAME_FACTORY.update_base_dict(args['data']) inputfiles = create_inputlist(args['ft1file']) outdir_base = os.path.join(NAME_FACTORY.base_dict['basedir'], 'counts_cubes') for idx, infile in enumerate(inputfiles): key = "%06i" % idx output_dir = os.path.join(outdir_base, key) try: os.mkdir(output_dir) except OSError: pass logfile = make_nfs_path(os.path.join(output_dir, 'scatter_%s.log' % key)) #job_configs[key] = args.copy() job_configs[key] = args.copy() job_configs[key].update(dict(ft1file=infile, comp=args['comp'], hpx_order_max=args['hpx_order_max'], outdir=outdir_base, outkey=key, logfile=logfile, pfiles=output_dir)) return job_configs
[docs]class SplitAndBinChain(Chain): """Chain to run split and bin and then make exposure cubes This chain consists of: split-and-bin : `SplitAndBin_SG` Chain to make the binned counts maps for each input file coadd-split : `CoaddSplit_SG` Link to co-add the binnec counts maps files expcube2 : `Gtexpcube2_SG` Link to make the corresponding binned exposure maps """ appname = 'fermipy-split-and-bin-chain' linkname_default = 'split-and-bin-chain' usage = '%s [options]' % (appname) description = 'Run split-and-bin, coadd-split and exposure' default_options = dict(data=diffuse_defaults.diffuse['data'], comp=diffuse_defaults.diffuse['comp'], ft1file=diffuse_defaults.diffuse['ft1file'], hpx_order_ccube=diffuse_defaults.diffuse['hpx_order_ccube'], hpx_order_expcube=diffuse_defaults.diffuse['hpx_order_expcube'], scratch=diffuse_defaults.diffuse['scratch'], dry_run=diffuse_defaults.diffuse['dry_run']) __doc__ += Link.construct_docstring(default_options) def _map_arguments(self, args): """Map from the top-level arguments to the arguments provided to the indiviudal links """ data = args.get('data') comp = args.get('comp') ft1file = args.get('ft1file') scratch = args.get('scratch', None) dry_run = args.get('dry_run', None) self._set_link('split-and-bin', SplitAndBin_SG, comp=comp, data=data, hpx_order_max=args.get('hpx_order_ccube', 9), ft1file=ft1file, scratch=scratch, dry_run=dry_run) self._set_link('coadd-split', CoaddSplit_SG, comp=comp, data=data, ft1file=ft1file) self._set_link('expcube2', Gtexpcube2_SG, comp=comp, data=data, hpx_order_max=args.get('hpx_order_expcube', 5), dry_run=dry_run)
def register_classes(): """Register these classes with the `LinkFactory` """ SplitAndBin.register_class() SplitAndBin_SG.register_class() SplitAndBinChain.register_class()