# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Merge source maps to build composite sources
"""
from __future__ import absolute_import, division, print_function
import os
import BinnedAnalysis as BinnedAnalysis
import pyLikelihood as pyLike
from fermipy.jobs.file_archive import FileFlags
from fermipy.jobs.link import Link
from fermipy.jobs.scatter_gather import ScatterGather
from fermipy.jobs.slac_impl import make_nfs_path
from fermipy.diffuse.name_policy import NameFactory
from fermipy.diffuse.binning import Component
from fermipy.diffuse.catalog_src_manager import make_catalog_comp_dict
from fermipy.diffuse import defaults as diffuse_defaults
NAME_FACTORY = NameFactory()
[docs]class GtMergeSrcmaps(Link):
"""Small class to merge source maps for composite sources.
This is useful for parallelizing source map creation.
"""
NULL_MODEL = 'srcmdls/null.xml'
appname = 'fermipy-merge-srcmaps'
linkname_default = 'merge-srcmaps'
usage = '%s [options]' % (appname)
description = "Mrege source maps from a set of sources"
default_options = dict(irfs=diffuse_defaults.gtopts['irfs'],
expcube=diffuse_defaults.gtopts['expcube'],
bexpmap=diffuse_defaults.gtopts['bexpmap'],
srcmaps=diffuse_defaults.gtopts['srcmaps'],
srcmdl=diffuse_defaults.gtopts['srcmdl'],
outfile=diffuse_defaults.gtopts['outfile'],
merged=(None, 'Name of merged source', str),
outxml=(None, 'Output source model xml file', str),
gzip=(False, 'Compress output file', bool))
default_file_args = dict(expcube=FileFlags.input_mask,
cmap=FileFlags.input_mask,
bexpmap=FileFlags.input_mask,
srcmdl=FileFlags.input_mask,
outfile=FileFlags.output_mask,
outxml=FileFlags.output_mask)
__doc__ += Link.construct_docstring(default_options)
[docs] def run_analysis(self, argv):
"""Run this analysis"""
args = self._parser.parse_args(argv)
obs = BinnedAnalysis.BinnedObs(irfs=args.irfs,
expCube=args.expcube,
srcMaps=args.srcmaps,
binnedExpMap=args.bexpmap)
like = BinnedAnalysis.BinnedAnalysis(obs,
optimizer='MINUIT',
srcModel=GtMergeSrcmaps.NULL_MODEL,
wmap=None)
like.logLike.set_use_single_fixed_map(False)
print("Reading xml model from %s" % args.srcmdl)
source_factory = pyLike.SourceFactory(obs.observation)
source_factory.readXml(args.srcmdl, BinnedAnalysis._funcFactory, False, True, True)
strv = pyLike.StringVector()
source_factory.fetchSrcNames(strv)
source_names = [strv[i] for i in range(strv.size())]
missing_sources = []
srcs_to_merge = []
for source_name in source_names:
try:
source = source_factory.releaseSource(source_name)
# EAC, add the source directly to the model
like.logLike.addSource(source)
srcs_to_merge.append(source_name)
except KeyError:
missing_sources.append(source_name)
comp = like.mergeSources(args.merged, source_names, 'ConstantValue')
like.logLike.getSourceMap(comp.getName())
print("Merged %i sources into %s" % (len(srcs_to_merge), comp.getName()))
if missing_sources:
print("Missed sources: ", missing_sources)
print("Writing output source map file %s" % args.outfile)
like.logLike.saveSourceMaps(args.outfile, False, False)
if args.gzip:
os.system("gzip -9 %s" % args.outfile)
print("Writing output xml file %s" % args.outxml)
like.writeXml(args.outxml)
[docs]class MergeSrcmaps_SG(ScatterGather):
"""Small class to generate configurations for `GtMergeSrcmaps`
"""
appname = 'fermipy-merge-srcmaps-sg'
usage = "%s [options]" % (appname)
description = "Merge diffuse maps for all-sky analysis"
clientclass = GtMergeSrcmaps
job_time = 300
default_options = dict(comp=diffuse_defaults.diffuse['comp'],
data=diffuse_defaults.diffuse['data'],
library=diffuse_defaults.diffuse['library'])
__doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args):
"""Hook to build job configurations
"""
job_configs = {}
components = Component.build_from_yamlfile(args['comp'])
NAME_FACTORY.update_base_dict(args['data'])
ret_dict = make_catalog_comp_dict(sources=args['library'], basedir='.')
comp_info_dict = ret_dict['comp_info_dict']
for split_ver, split_dict in comp_info_dict.items():
for source_key, source_dict in split_dict.items():
full_key = "%s_%s" % (split_ver, source_key)
merged_name = "%s_%s" % (source_dict.catalog_info.catalog_name, source_key)
if source_dict.model_type != 'CompositeSource':
continue
for comp in components:
zcut = "zmax%i" % comp.zmax
key = "%s_%s" % (full_key, comp.make_key('{ebin_name}_{evtype_name}'))
name_keys = dict(zcut=zcut,
sourcekey=full_key,
ebin=comp.ebin_name,
psftype=comp.evtype_name,
coordsys=comp.coordsys,
mktime='none',
irf_ver=NAME_FACTORY.irf_ver())
nested_name_keys = dict(zcut=zcut,
sourcekey=source_dict.catalog_info.catalog_name,
ebin=comp.ebin_name,
psftype=comp.evtype_name,
coordsys=comp.coordsys,
mktime='none',
irf_ver=NAME_FACTORY.irf_ver())
outfile = NAME_FACTORY.srcmaps(**name_keys)
logfile = make_nfs_path(outfile.replace('.fits', '.log'))
job_configs[key] = dict(srcmaps=NAME_FACTORY.srcmaps(**nested_name_keys),
expcube=NAME_FACTORY.ltcube(**name_keys),
irfs=NAME_FACTORY.irfs(**name_keys),
bexpmap=NAME_FACTORY.bexpcube(**name_keys),
srcmdl=NAME_FACTORY.srcmdl_xml(**name_keys),
merged=merged_name,
outfile=outfile,
outxml=NAME_FACTORY.nested_srcmdl_xml(**name_keys),
logfile=logfile)
return job_configs
def register_merge_srcmaps():
"""Register these classes with the `LinkFactory` """
GtMergeSrcmaps.register_class()
MergeSrcmaps_SG.register_class()