#!/usr/bin/env python
# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Run gtsrcmaps for a single energy plane for a single source
This is useful to parallize the production of the source maps
"""
from __future__ import absolute_import, division, print_function
import os
import sys
from shutil import copyfile
import numpy as np
import glob
from astropy import units as u
from astropy.coordinates import SkyCoord, ICRS, Galactic
from gammapy.maps import WcsGeom
from fermipy.utils import load_yaml, write_yaml, init_matplotlib_backend
from fermipy import utils
from fermipy.jobs.utils import is_null, is_not_null
from fermipy.jobs.link import Link
from fermipy.jobs.scatter_gather import ScatterGather
from fermipy.jobs.slac_impl import make_nfs_path
from fermipy.jobs.analysis_utils import add_source_get_correlated
from fermipy.jobs.name_policy import NameFactory
from fermipy.jobs import defaults
init_matplotlib_backend('Agg')
try:
from fermipy.gtanalysis import GTAnalysis
HAVE_ST = True
except ImportError:
HAVE_ST = False
NAME_FACTORY = NameFactory(basedir=('.'))
[docs]class CopyBaseROI(Link):
"""Small class to copy a baseline ROI to a simulation area
This is useful for parallelizing analysis using the fermipy.jobs module.
"""
appname = 'fermipy-copy-base-roi'
linkname_default = 'copy-base-roi'
usage = '%s [options]' % (appname)
description = "Copy a baseline ROI to a simulation area"
default_options = dict(ttype=defaults.common['ttype'],
target=defaults.common['target'],
roi_baseline=defaults.common['roi_baseline'],
extracopy=defaults.sims['extracopy'],
sim=defaults.sims['sim'])
copyfiles = ['srcmap_*.fits', 'ccube.fits', 'ccube_*.fits']
__doc__ += Link.construct_docstring(default_options)
[docs] @classmethod
def copy_analysis_files(cls, orig_dir, dest_dir, copyfiles):
""" Copy a list of files from orig_dir to dest_dir"""
for pattern in copyfiles:
glob_path = os.path.join(orig_dir, pattern)
files = glob.glob(glob_path)
for ff in files:
f = os.path.basename(ff)
orig_path = os.path.join(orig_dir, f)
dest_path = os.path.join(dest_dir, f)
try:
copyfile(orig_path, dest_path)
except IOError:
sys.stderr.write("WARNING: failed to copy %s\n" % orig_path)
[docs] @classmethod
def copy_target_dir(cls, orig_dir, dest_dir, roi_baseline, extracopy):
""" Create and populate directoris for target analysis
"""
try:
os.makedirs(dest_dir)
except OSError:
pass
copyfiles = ['%s.fits' % roi_baseline,
'%s.npy' % roi_baseline,
'%s_*.xml' % roi_baseline] + cls.copyfiles
if isinstance(extracopy, list):
copyfiles += extracopy
cls.copy_analysis_files(orig_dir, dest_dir, copyfiles)
[docs] def run_analysis(self, argv):
"""Run this analysis"""
args = self._parser.parse_args(argv)
name_keys = dict(target_type=args.ttype,
target_name=args.target,
sim_name=args.sim,
fullpath=True)
orig_dir = NAME_FACTORY.targetdir(**name_keys)
dest_dir = NAME_FACTORY.sim_targetdir(**name_keys)
self.copy_target_dir(orig_dir, dest_dir,
args.roi_baseline, args.extracopy)
[docs]class CopyBaseROI_SG(ScatterGather):
"""Small class to generate configurations for this script
This adds the following arguments:
"""
appname = 'fermipy-copy-base-roi-sg'
usage = "%s [options]" % (appname)
description = "Run analyses on a series of ROIs"
clientclass = CopyBaseROI
job_time = 60
default_options = dict(ttype=defaults.common['ttype'],
targetlist=defaults.common['targetlist'],
roi_baseline=defaults.common['roi_baseline'],
sim=defaults.sims['sim'],
extracopy=defaults.sims['extracopy'])
__doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args):
"""Hook to build job configurations
"""
job_configs = {}
ttype = args['ttype']
(sim_targets_yaml, sim) = NAME_FACTORY.resolve_targetfile(args)
targets = load_yaml(sim_targets_yaml)
base_config = dict(ttype=ttype,
roi_baseline=args['roi_baseline'],
extracopy = args['extracopy'],
sim=sim)
for target_name in targets.keys():
targetdir = NAME_FACTORY.sim_targetdir(target_type=ttype,
target_name=target_name,
sim_name=sim)
logfile = os.path.join(targetdir, 'copy_base_dir.log')
job_config = base_config.copy()
job_config.update(dict(target=target_name,
logfile=logfile))
job_configs[target_name] = job_config
return job_configs
[docs]class RandomDirGen(Link):
"""Small class to generate random sky directions inside an ROI
This is useful for parallelizing analysis using the fermipy.jobs module.
"""
appname = 'fermipy-random-dir-gen'
linkname_default = 'random-dir-gen'
usage = '%s [options]' % (appname)
description = "Generate random sky directions in an ROI"
default_options = dict(config=defaults.common['config'],
rand_config=defaults.sims['rand_config'],
outfile=defaults.generic['outfile'])
__doc__ += Link.construct_docstring(default_options)
@staticmethod
def _make_wcsgeom_from_config(config):
"""Build a `WCS.Geom` object from a fermipy coniguration file"""
binning = config['binning']
binsz = binning['binsz']
coordsys = binning.get('coordsys', 'GAL')
roiwidth = binning['roiwidth']
proj = binning.get('proj', 'AIT')
ra = config['selection']['ra']
dec = config['selection']['dec']
npix = int(np.round(roiwidth / binsz))
skydir = SkyCoord(ra * u.deg, dec * u.deg)
wcsgeom = WcsGeom.create(npix=npix, binsz=binsz,
proj=proj, coordsys=coordsys,
skydir=skydir)
return wcsgeom
@staticmethod
def _build_skydir_dict(wcsgeom, rand_config):
"""Build a dictionary of random directions"""
step_x = rand_config['step_x']
step_y = rand_config['step_y']
max_x = rand_config['max_x']
max_y = rand_config['max_y']
seed = rand_config['seed']
nsims = rand_config['nsims']
cdelt = wcsgeom.wcs.wcs.cdelt
pixstep_x = step_x / cdelt[0]
pixstep_y = -1. * step_y / cdelt[1]
pixmax_x = max_x / cdelt[0]
pixmax_y = max_y / cdelt[0]
nstep_x = int(np.ceil(2. * pixmax_x / pixstep_x)) + 1
nstep_y = int(np.ceil(2. * pixmax_y / pixstep_y)) + 1
center = np.array(wcsgeom.center_pix)
grid = np.meshgrid(np.linspace(-1 * pixmax_x, pixmax_x, nstep_x),
np.linspace(-1 * pixmax_y, pixmax_y, nstep_y))
grid[0] += center[0]
grid[1] += center[1]
test_grid = wcsgeom.pix_to_coord(grid)
glat_vals = test_grid[0].flat
glon_vals = test_grid[1].flat
conv_vals = SkyCoord(glat_vals * u.deg, glon_vals *
u.deg, frame=Galactic).transform_to(ICRS)
ra_vals = conv_vals.ra.deg[seed:nsims]
dec_vals = conv_vals.dec.deg[seed:nsims]
o_dict = {}
for i, (ra, dec) in enumerate(zip(ra_vals, dec_vals)):
key = i + seed
o_dict[key] = dict(ra=ra, dec=dec)
return o_dict
[docs] def run_analysis(self, argv):
"""Run this analysis"""
args = self._parser.parse_args(argv)
if is_null(args.config):
raise ValueError("Config yaml file must be specified")
if is_null(args.rand_config):
raise ValueError(
"Random direction config yaml file must be specified")
config = load_yaml(args.config)
rand_config = load_yaml(args.rand_config)
wcsgeom = self._make_wcsgeom_from_config(config)
dir_dict = self._build_skydir_dict(wcsgeom, rand_config)
if is_not_null(args.outfile):
write_yaml(dir_dict, args.outfile)
[docs]class SimulateROI(Link):
"""Small class wrap an analysis script.
This is useful for parallelizing analysis using the fermipy.jobs module.
"""
appname = 'fermipy-simulate-roi'
linkname_default = 'simulate-roi'
usage = '%s [options]' % (appname)
description = "Run simulated analysis of a single ROI"
default_options = dict(config=defaults.common['config'],
roi_baseline=defaults.common['roi_baseline'],
profiles=defaults.common['profiles'],
non_null_src=defaults.common['non_null_src'],
do_find_src=defaults.common['do_find_src'],
sim_profile=defaults.sims['sim_profile'],
sim=defaults.sims['sim'],
nsims=defaults.sims['nsims'],
seed=defaults.sims['seed'])
__doc__ += Link.construct_docstring(default_options)
@staticmethod
def _clone_config_and_srcmaps(config_path, seed):
"""Clone the configuration"""
workdir = os.path.dirname(config_path)
new_config_path = config_path.replace('.yaml', '_%06i.yaml' % seed)
config = load_yaml(config_path)
comps = config.get('components', [config])
for i, comp in enumerate(comps):
comp_name = "%02i" % i
if 'gtlike' not in comp:
comp['gtlike'] = {}
orig_srcmap = os.path.abspath(os.path.join(workdir, 'srcmap_%s.fits' % (comp_name)))
new_srcmap = os.path.abspath(os.path.join(workdir, 'srcmap_%06i_%s.fits' % (seed, comp_name)))
comp['gtlike']['srcmap'] = os.path.abspath(os.path.join(workdir, 'srcmap_%06i_%s.fits' % (seed, comp_name)))
comp['gtlike']['use_external_srcmap'] = True
copyfile(orig_srcmap, new_srcmap)
write_yaml(config, new_config_path)
return new_config_path
@staticmethod
def _run_simulation(gta, roi_baseline,
injected_name, test_sources, current_seed, seed,
non_null_src, do_find_src = False):
"""Simulate a realization of this analysis"""
gta.load_roi('sim_baseline_%06i.npy' % current_seed)
gta.set_random_seed(seed)
gta.simulate_roi()
if injected_name:
gta.zero_source(injected_name)
gta.optimize()
if do_find_src:
gta.find_sources(sqrt_ts_threshold=5.0, search_skydir=gta.roi.skydir,
search_minmax_radius=[1.0, np.nan])
gta.optimize()
gta.free_sources(skydir=gta.roi.skydir, distance=1.0, pars='norm')
if injected_name:
gta.free_source(injected_name, False)
gta.fit(covar=True)
gta.write_roi('sim_refit_%06i' % current_seed)
for pkey, test_source in test_sources.items():
test_source_name = test_source['name']
sedfile = "sed_%s_%06i.fits" % (test_source_name, seed)
correl_dict, test_src_name = add_source_get_correlated(gta, test_source_name,
test_source['source_model'],
correl_thresh=0.25,
non_null_src=non_null_src)
# Write the list of correlated sources
correl_yaml = os.path.join(gta.workdir,
"correl_%s_%06i.yaml" % (test_source_name, seed))
write_yaml(correl_dict, correl_yaml)
gta.free_sources(False)
for src_name in correl_dict.keys():
gta.free_source(src_name, pars='norm')
if injected_name:
gta.free_source(injected_name, False)
gta.sed(test_source_name, outfile=sedfile)
# Set things back to how they were
gta.delete_source(test_source_name)
gta.load_xml('sim_refit_%06i' % current_seed)
[docs] def run_analysis(self, argv):
"""Run this analysis"""
args = self._parser.parse_args(argv)
if not HAVE_ST:
raise RuntimeError(
"Trying to run fermipy analysis, but don't have ST")
workdir = os.path.dirname(args.config)
_config_file = self._clone_config_and_srcmaps(args.config, args.seed)
gta = GTAnalysis(_config_file, logging={'verbosity': 3},
fileio={'workdir_regex': '\.xml$|\.npy$'})
gta.load_roi(args.roi_baseline)
simfile = os.path.join(workdir, 'sim_%s_%s.yaml' %
(args.sim, args.sim_profile))
mcube_file = "%s_%s_%06i" % (args.sim, args.sim_profile, args.seed)
sim_config = utils.load_yaml(simfile)
injected_source = sim_config.get('injected_source', None)
if injected_source is not None:
src_dict = injected_source['source_model']
src_dict['ra'] = gta.config['selection']['ra']
src_dict['dec'] = gta.config['selection']['dec']
injected_name = injected_source['name']
gta.add_source(injected_name, src_dict)
gta.write_model_map(mcube_file)
mc_spec_dict = dict(true_counts=gta.model_counts_spectrum(injected_name),
energies=gta.energies,
model=src_dict)
mcspec_file = os.path.join(workdir,
"mcspec_%s_%06i.yaml" % (mcube_file, args.seed))
utils.write_yaml(mc_spec_dict, mcspec_file)
else:
injected_name = None
gta.write_roi('sim_baseline_%06i' % args.seed)
test_sources = {}
for profile in args.profiles:
profile_path = os.path.join(workdir, 'profile_%s.yaml' % profile)
test_source = load_yaml(profile_path)
test_sources[profile] = test_source
first = args.seed
last = first + args.nsims
for seed in range(first, last):
self._run_simulation(gta, args.roi_baseline,
injected_name, test_sources, first, seed,
non_null_src=args.non_null_src,
do_find_src=args.do_find_src)
[docs]class RandomDirGen_SG(ScatterGather):
"""Small class to generate configurations for this script
This adds the following arguments:
"""
appname = 'fermipy-random-dir-gen-sg'
usage = "%s [options]" % (appname)
description = "Run analyses on a series of ROIs"
clientclass = RandomDirGen
job_time = 60
default_options = dict(ttype=defaults.common['ttype'],
targetlist=defaults.common['targetlist'],
config=defaults.common['config'],
rand_config=defaults.sims['rand_config'],
sim=defaults.sims['sim'])
__doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args):
"""Hook to build job configurations
"""
job_configs = {}
ttype = args['ttype']
(targets_yaml, sim) = NAME_FACTORY.resolve_targetfile(args)
if targets_yaml is None:
return job_configs
config_yaml = 'config.yaml'
config_override = args.get('config')
if is_not_null(config_override):
config_yaml = config_override
rand_yaml = NAME_FACTORY.resolve_randconfig(args)
targets = load_yaml(targets_yaml)
base_config = dict(rand_config=rand_yaml)
for target_name in targets.keys():
name_keys = dict(target_type=ttype,
target_name=target_name,
sim_name=sim,
fullpath=True)
simdir = NAME_FACTORY.sim_targetdir(**name_keys)
config_path = os.path.join(simdir, config_yaml)
outfile = os.path.join(simdir, 'skydirs.yaml')
logfile = make_nfs_path(outfile.replace('yaml', 'log'))
job_config = base_config.copy()
job_config.update(dict(config=config_path,
outfile=outfile,
logfile=logfile))
job_configs[target_name] = job_config
return job_configs
[docs]class SimulateROI_SG(ScatterGather):
"""Small class to generate configurations for this script
This adds the following arguments:
"""
appname = 'fermipy-simulate-roi-sg'
usage = "%s [options]" % (appname)
description = "Run analyses on a series of ROIs"
clientclass = SimulateROI
job_time = 1500
default_options = dict(ttype=defaults.common['ttype'],
targetlist=defaults.common['targetlist'],
config=defaults.common['config'],
roi_baseline=defaults.common['roi_baseline'],
non_null_src=defaults.common['non_null_src'],
do_find_src=defaults.common['do_find_src'],
sim=defaults.sims['sim'],
sim_profile=defaults.sims['sim_profile'],
nsims=defaults.sims['nsims'],
seed=defaults.sims['seed'],
nsims_job=defaults.sims['nsims_job'])
__doc__ += Link.construct_docstring(default_options)
[docs] def build_job_configs(self, args):
"""Hook to build job configurations
"""
job_configs = {}
ttype = args['ttype']
(targets_yaml, sim) = NAME_FACTORY.resolve_targetfile(args)
if targets_yaml is None:
return job_configs
config_yaml = 'config.yaml'
config_override = args.get('config')
if is_not_null(config_override):
config_yaml = config_override
targets = load_yaml(targets_yaml)
nsims_job = args['nsims_job']
first_seed = args['seed']
nsims = args['nsims']
last_seed = first_seed + nsims
base_config = dict(sim_profile=args['sim_profile'],
roi_baseline=args['roi_baseline'],
non_null_src=args['non_null_src'],
do_find_src=args['do_find_src'],
sim=sim)
for target_name, target_list in targets.items():
name_keys = dict(target_type=ttype,
target_name=target_name,
sim_name=sim,
fullpath=True)
simdir = NAME_FACTORY.sim_targetdir(**name_keys)
config_path = os.path.join(simdir, config_yaml)
job_config = base_config.copy()
job_config.update(dict(config=config_path,
profiles=target_list))
current_seed = first_seed
while current_seed < last_seed:
fullkey = "%s_%06i" % (target_name, current_seed)
logfile = make_nfs_path(os.path.join(simdir, "%s_%s_%06i.log" % (self.linkname,
target_name, current_seed)))
if nsims_job <= 0 or current_seed + nsims_job >= last_seed:
nsims_current = last_seed - current_seed
else:
nsims_current = nsims_job
job_config.update(dict(seed=current_seed,
nsims=nsims_current,
logfile=logfile))
job_configs[fullkey] = job_config.copy()
current_seed += nsims_current
return job_configs
def register_classes():
"""Register these classes with the `LinkFactory` """
CopyBaseROI.register_class()
CopyBaseROI_SG.register_class()
SimulateROI.register_class()
SimulateROI_SG.register_class()
RandomDirGen.register_class()
RandomDirGen_SG.register_class()