Source code for fermipy.diffuse.gt_assemble_model

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Merge source maps to build composite sources
"""
from __future__ import absolute_import, division, print_function

import os
import sys

import yaml

from astropy.io import fits
from fermipy.skymap import HpxMap

from fermipy.utils import load_yaml

from fermipy.jobs.scatter_gather import ScatterGather
from fermipy.jobs.slac_impl import make_nfs_path
from fermipy.jobs.link import Link
from fermipy.jobs.chain import Chain

from fermipy.diffuse.binning import Component
from fermipy.diffuse.name_policy import NameFactory
from fermipy.diffuse import defaults as diffuse_defaults
from fermipy.diffuse.model_manager import make_library

NAME_FACTORY = NameFactory()


[docs]class InitModel(Link): """Small class to preprate files fermipy analysis. Specifically this create the srcmap_manifest and fermipy_config_yaml files """ appname = 'fermipy-init-model' linkname_default = 'init-model' usage = '%s [options]' % (appname) description = "Initialize model fitting directory" default_options = dict(comp=diffuse_defaults.diffuse['comp'], data=diffuse_defaults.diffuse['data'], library=diffuse_defaults.diffuse['library'], models=diffuse_defaults.diffuse['models'], hpx_order=diffuse_defaults.diffuse['hpx_order_fitting'])
[docs] def run_analysis(self, argv): """ Build the manifest for all the models """ args = self._parser.parse_args(argv) components = Component.build_from_yamlfile(args.comp) NAME_FACTORY.update_base_dict(args.data) model_dict = make_library(**args.__dict__) model_manager = model_dict['ModelManager'] models = load_yaml(args.models) data = args.data hpx_order = args.hpx_order for modelkey in models: model_manager.make_srcmap_manifest(modelkey, components, data) model_manager.make_fermipy_config_yaml(modelkey, components, data, hpx_order=hpx_order, irf_ver=NAME_FACTORY.irf_ver())
[docs]class AssembleModel(Link): """Small class to assemple source map files for fermipy analysis. This is useful for re-merging after parallelizing source map creation. """ appname = 'fermipy-assemble-model' linkname_default = 'assemble-model' usage = '%s [options]' % (appname) description = "Assemble sourcemaps for model fitting" default_options = dict(input=(None, 'Input yaml file', str), compname=(None, 'Component name.', str), hpx_order=diffuse_defaults.diffuse['hpx_order_fitting'])
[docs] @staticmethod def copy_ccube(ccube, outsrcmap, hpx_order): """Copy a counts cube into outsrcmap file reducing the HEALPix order to hpx_order if needed. """ sys.stdout.write(" Copying counts cube from %s to %s\n" % (ccube, outsrcmap)) try: hdulist_in = fits.open(ccube) except IOError: hdulist_in = fits.open("%s.gz" % ccube) hpx_order_in = hdulist_in[1].header['ORDER'] if hpx_order_in > hpx_order: hpxmap = HpxMap.create_from_hdulist(hdulist_in) hpxmap_out = hpxmap.ud_grade(hpx_order, preserve_counts=True) hpxlist_out = hdulist_in #hpxlist_out['SKYMAP'] = hpxmap_out.create_image_hdu() hpxlist_out[1] = hpxmap_out.create_image_hdu() hpxlist_out[1].name = 'SKYMAP' hpxlist_out.writeto(outsrcmap) return hpx_order else: os.system('cp %s %s' % (ccube, outsrcmap)) #os.system('cp %s.gz %s.gz' % (ccube, outsrcmap)) #os.system('gunzip -f %s.gz' % (outsrcmap)) return None
[docs] @staticmethod def open_outsrcmap(outsrcmap): """Open and return the outsrcmap file in append mode """ outhdulist = fits.open(outsrcmap, 'append') return outhdulist
[docs] @staticmethod def append_hdus(hdulist, srcmap_file, source_names, hpx_order): """Append HEALPix maps to a list Parameters ---------- hdulist : list The list being appended to srcmap_file : str Path to the file containing the HDUs source_names : list of str Names of the sources to extract from srcmap_file hpx_order : int Maximum order for maps """ sys.stdout.write(" Extracting %i sources from %s" % (len(source_names), srcmap_file)) try: hdulist_in = fits.open(srcmap_file) except IOError: try: hdulist_in = fits.open('%s.gz' % srcmap_file) except IOError: sys.stdout.write(" Missing file %s\n" % srcmap_file) return for source_name in source_names: sys.stdout.write('.') sys.stdout.flush() if hpx_order is None: hdulist.append(hdulist_in[source_name]) else: try: hpxmap = HpxMap.create_from_hdulist(hdulist_in, hdu=source_name) except IndexError: print(" Index error on source %s in file %s" % (source_name, srcmap_file)) continue except KeyError: print(" Key error on source %s in file %s" % (source_name, srcmap_file)) continue hpxmap_out = hpxmap.ud_grade(hpx_order, preserve_counts=True) hdulist.append(hpxmap_out.create_image_hdu(name=source_name)) sys.stdout.write("\n") hdulist.flush() hdulist_in.close()
[docs] @staticmethod def assemble_component(compname, compinfo, hpx_order): """Assemble the source map file for one binning component Parameters ---------- compname : str The key for this component (e.g., E0_PSF3) compinfo : dict Information about this component hpx_order : int Maximum order for maps """ sys.stdout.write("Working on component %s\n" % compname) ccube = compinfo['ccube'] outsrcmap = compinfo['outsrcmap'] source_dict = compinfo['source_dict'] hpx_order = AssembleModel.copy_ccube(ccube, outsrcmap, hpx_order) hdulist = AssembleModel.open_outsrcmap(outsrcmap) for comp_name in sorted(source_dict.keys()): source_info = source_dict[comp_name] source_names = source_info['source_names'] srcmap_file = source_info['srcmap_file'] AssembleModel.append_hdus(hdulist, srcmap_file, source_names, hpx_order) sys.stdout.write("Done!\n")
[docs] def run_analysis(self, argv): """Assemble the source map file for one binning component FIXME """ args = self._parser.parse_args(argv) manifest = yaml.safe_load(open(args.input)) compname = args.compname value = manifest[compname] self.assemble_component(compname, value, args.hpx_order)
[docs]class AssembleModel_SG(ScatterGather): """Small class to generate configurations for this script Parameters ---------- --compname : binning component definition yaml file --data : datset definition yaml file --models : model definitino yaml file args : Names of models to assemble source maps for """ appname = 'fermipy-assemble-model-sg' usage = "%s [options]" % (appname) description = "Copy source maps from the library to a analysis directory" clientclass = AssembleModel job_time = 300 default_options = dict(comp=diffuse_defaults.diffuse['comp'], data=diffuse_defaults.diffuse['data'], hpx_order=diffuse_defaults.diffuse['hpx_order_fitting'], models=diffuse_defaults.diffuse['models'])
[docs] def build_job_configs(self, args): """Hook to build job configurations """ job_configs = {} components = Component.build_from_yamlfile(args['comp']) NAME_FACTORY.update_base_dict(args['data']) models = load_yaml(args['models']) for modelkey in models: manifest = os.path.join('analysis', 'model_%s' % modelkey, 'srcmap_manifest_%s.yaml' % modelkey) for comp in components: key = comp.make_key('{ebin_name}_{evtype_name}') fullkey = "%s_%s" % (modelkey, key) outfile = NAME_FACTORY.merged_srcmaps(modelkey=modelkey, component=key, coordsys=comp.coordsys, mktime='none', irf_ver=NAME_FACTORY.irf_ver()) logfile = make_nfs_path(outfile.replace('.fits', '.log')) job_configs[fullkey] = dict(input=manifest, compname=key, hpx_order=args['hpx_order'], logfile=logfile) return job_configs
[docs]class AssembleModelChain(Chain): """Small class to split, apply mktime and bin data according to some user-provided specification """ appname = 'fermipy-assemble-model-chain' linkname_default = 'assemble-model-chain' usage = '%s [options]' % (appname) description = 'Run init-model and assemble-model' default_options = dict(data=diffuse_defaults.diffuse['data'], comp=diffuse_defaults.diffuse['comp'], library=diffuse_defaults.diffuse['library'], models=diffuse_defaults.diffuse['models'], hpx_order=diffuse_defaults.diffuse['hpx_order_fitting'], dry_run=diffuse_defaults.diffuse['dry_run']) def __init__(self, **kwargs): """C'tor """ super(AssembleModelChain, self).__init__(**kwargs) self.comp_dict = None def _register_link_classes(self): InitModel.register_class() AssembleModel_SG.register_class() def _map_arguments(self, input_dict): """Map from the top-level arguments to the arguments provided to the indiviudal links """ data = input_dict.get('data') comp = input_dict.get('comp') library = input_dict.get('library') models = input_dict.get('models') hpx_order = input_dict.get('hpx_order') dry_run = input_dict.get('dry_run', False) self._set_link('init-model', InitModel, comp=comp, data=data, library=library, models=models, hpx_order=hpx_order, dry_run=dry_run) self._set_link('assemble-model', AssembleModel_SG, comp=comp, data=data, hpx_order=hpx_order, models=models)
def register_classes(): """Register these classes with the `LinkFactory` """ InitModel.register_class() AssembleModel.register_class() AssembleModel_SG.register_class() AssembleModelChain.register_class()