# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function
import os
import copy
import re
import collections
import numpy as np
import xml.etree.cElementTree as ElementTree
from astropy import units as u
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astropy.table import Table, Column
import fermipy
import fermipy.config
from fermipy import utils
from fermipy import wcs_utils
from fermipy import catalog
from fermipy import defaults
from fermipy import model_utils
from fermipy.logger import Logger, log_level
from fermipy.model_utils import make_parameter_dict
from fermipy.model_utils import cast_pars_dict
from fermipy.model_utils import get_function_defaults
from fermipy.model_utils import get_spatial_type
from fermipy.model_utils import get_function_norm_par_name
from fermipy.model_utils import get_function_par_names
from fermipy.model_utils import extract_pars_from_dict
from fermipy.model_utils import create_pars_from_dict
[docs]def create_source_table(scan_shape):
"""Create an empty source table.
Returns
-------
tab : `~astropy.table.Table`
"""
cols_dict = collections.OrderedDict()
cols_dict['Source_Name'] = dict(dtype='S48', format='%s')
cols_dict['name'] = dict(dtype='S48', format='%s')
cols_dict['class'] = dict(dtype='S32', format='%s')
cols_dict['SpectrumType'] = dict(dtype='S32', format='%s')
cols_dict['SpatialType'] = dict(dtype='S32', format='%s')
cols_dict['SourceType'] = dict(dtype='S32', format='%s')
cols_dict['SpatialModel'] = dict(dtype='S32', format='%s')
cols_dict['RAJ2000'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['DEJ2000'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['GLON'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['GLAT'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['ts'] = dict(dtype='f8', format='%.3f')
cols_dict['loglike'] = dict(dtype='f8', format='%.3f')
cols_dict['npred'] = dict(dtype='f8', format='%.3f')
cols_dict['offset'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['offset_ra'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['offset_dec'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['offset_glon'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['offset_glat'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['offset_roi_edge'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['pivot_energy'] = dict(dtype='f8', format='%.3f', unit='MeV')
cols_dict['flux_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['eflux_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['norm_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['dloglike_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['loglike_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
# Add source dictionary columns
for k, v in sorted(defaults.source_output.items()):
if not k in cols_dict.keys():
if v[2] == float:
cols_dict[k] = dict(dtype='f8', format='%f')
elif k == 'Spectrum_Filename' or k == 'Spatial_Filename':
cols_dict[k] = dict(dtype='S128', format='%s')
elif v[2] == str:
cols_dict[k] = dict(dtype='S32', format='%s')
cols_dict['param_names'] = dict(dtype='S32', format='%s', shape=(10,))
cols_dict['param_values'] = dict(dtype='f8', format='%f', shape=(10,))
cols_dict['param_errors'] = dict(dtype='f8', format='%f', shape=(10,))
# Catalog Parameters
cols_dict['Flux_Density'] = dict(
dtype='f8', format='%.5g', unit='1 / (MeV cm2 s)')
cols_dict['Spectral_Index'] = dict(dtype='f8', format='%.3f')
cols_dict['Pivot_Energy'] = dict(dtype='f8', format='%.3f', unit='MeV')
cols_dict['beta'] = dict(dtype='f8', format='%.3f')
cols_dict['Exp_Index'] = dict(dtype='f8', format='%.3f')
cols_dict['Cutoff'] = dict(dtype='f8', format='%.3f', unit='MeV')
cols_dict['Conf_68_PosAng'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['Conf_68_SemiMajor'] = dict(
dtype='f8', format='%.3f', unit='deg')
cols_dict['Conf_68_SemiMinor'] = dict(
dtype='f8', format='%.3f', unit='deg')
cols_dict['Conf_95_PosAng'] = dict(dtype='f8', format='%.3f', unit='deg')
cols_dict['Conf_95_SemiMajor'] = dict(
dtype='f8', format='%.3f', unit='deg')
cols_dict['Conf_95_SemiMinor'] = dict(
dtype='f8', format='%.3f', unit='deg')
for t in ['eflux', 'eflux100', 'eflux1000', 'eflux10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f', unit='MeV / (cm2 s)')
cols_dict[t + '_err'] = dict(dtype='f8',
format='%.3f', unit='MeV / (cm2 s)')
for t in ['eflux_ul95', 'eflux100_ul95', 'eflux1000_ul95', 'eflux10000_ul95']:
cols_dict[t] = dict(dtype='f8', format='%.3f', unit='MeV / (cm2 s)')
for t in ['flux', 'flux100', 'flux1000', 'flux10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f', unit='1 / (cm2 s)')
cols_dict[t + '_err'] = dict(dtype='f8',
format='%.3f', unit='1 / (cm2 s)')
for t in ['flux_ul95', 'flux100_ul95', 'flux1000_ul95', 'flux10000_ul95']:
cols_dict[t] = dict(dtype='f8', format='%.3f', unit='1 / (cm2 s)')
for t in ['dnde', 'dnde100', 'dnde1000', 'dnde10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f', unit='1 / (MeV cm2 s)')
cols_dict[t + '_err'] = dict(dtype='f8',
format='%.3f', unit='1 / (MeV cm2 s)')
cols = [Column(name=k, **v) for k, v in cols_dict.items()]
tab = Table(cols)
return tab
[docs]def get_skydir_distance_mask(src_skydir, skydir, dist, min_dist=None,
square=False, coordsys='CEL'):
"""Retrieve sources within a certain angular distance of an
(ra,dec) coordinate. This function supports two types of
geometric selections: circular (square=False) and square
(square=True). The circular selection finds all sources with a given
angular distance of the target position. The square selection
finds sources within an ROI-like region of size R x R where R
= 2 x dist.
Parameters
----------
src_skydir : `~astropy.coordinates.SkyCoord`
Array of sky directions.
skydir : `~astropy.coordinates.SkyCoord`
Sky direction with respect to which the selection will be applied.
dist : float
Maximum distance in degrees from the sky coordinate.
square : bool
Choose whether to apply a circular or square selection.
coordsys : str
Coordinate system to use when applying a selection with square=True.
"""
if dist is None:
dist = 180.
if not square:
dtheta = src_skydir.separation(skydir).rad
elif coordsys == 'CEL':
dtheta = get_linear_dist(skydir,
src_skydir.ra.rad,
src_skydir.dec.rad,
coordsys=coordsys)
elif coordsys == 'GAL':
dtheta = get_linear_dist(skydir,
src_skydir.galactic.l.rad,
src_skydir.galactic.b.rad,
coordsys=coordsys)
else:
raise Exception('Unrecognized coordinate system: %s' % coordsys)
msk = (dtheta < np.radians(dist))
if min_dist is not None:
msk &= (dtheta > np.radians(min_dist))
return msk
[docs]def get_linear_dist(skydir, lon, lat, coordsys='CEL'):
xy = wcs_utils.sky_to_offset(skydir, np.degrees(lon), np.degrees(lat),
coordsys=coordsys)
x = np.radians(xy[:, 0])
y = np.radians(xy[:, 1])
delta = np.array([np.abs(x), np.abs(y)])
dtheta = np.max(delta, axis=0)
return dtheta
[docs]def get_dist_to_edge(skydir, lon, lat, width, coordsys='CEL'):
xy = wcs_utils.sky_to_offset(skydir, np.degrees(lon), np.degrees(lat),
coordsys=coordsys)
x = np.radians(xy[:, 0])
y = np.radians(xy[:, 1])
delta_edge = np.array([np.abs(x) - width, np.abs(y) - width])
dtheta = np.max(delta_edge, axis=0)
return dtheta
[docs]def get_true_params_dict(pars_dict):
params = {}
for k, p in pars_dict.items():
val = p['value'] * p['scale']
err = np.nan
if 'error' in p:
err = p['error'] * np.abs(p['scale'])
params[k] = {'value': val, 'error': err}
return params
[docs]def spectral_pars_from_catalog(cat):
"""Create spectral parameters from 3FGL catalog columns."""
spectrum_type = cat['SpectrumType']
pars = get_function_defaults(cat['SpectrumType'])
if spectrum_type == 'PowerLaw':
pars['Prefactor']['value'] = cat['Flux_Density']
pars['Scale']['value'] = cat['Pivot_Energy']
pars['Scale']['scale'] = 1.0
pars['Index']['value'] = cat['Spectral_Index']
pars['Index']['max'] = max(5.0, pars['Index']['value'] + 1.0)
pars['Index']['min'] = min(0.0, pars['Index']['value'] - 1.0)
pars['Index']['scale'] = -1.0
pars['Prefactor'] = make_parameter_dict(pars['Prefactor'])
pars['Scale'] = make_parameter_dict(pars['Scale'], True, False)
pars['Index'] = make_parameter_dict(pars['Index'], False, False)
elif spectrum_type == 'LogParabola':
pars['norm']['value'] = cat['Flux_Density']
pars['Eb']['value'] = cat['Pivot_Energy']
pars['alpha']['value'] = cat['Spectral_Index']
pars['beta']['value'] = cat['beta']
pars['norm'] = make_parameter_dict(pars['norm'], False, True)
pars['Eb'] = make_parameter_dict(pars['Eb'], True, False)
pars['alpha'] = make_parameter_dict(pars['alpha'], False, False)
pars['beta'] = make_parameter_dict(pars['beta'], False, False)
elif spectrum_type == 'PLSuperExpCutoff':
flux_density = cat['Flux_Density']
prefactor = (cat['Flux_Density'] *
np.exp((cat['Pivot_Energy'] / cat['Cutoff']) **
cat['Exp_Index']))
pars['Prefactor']['value'] = prefactor
pars['Index1']['value'] = cat['Spectral_Index']
pars['Index1']['scale'] = -1.0
pars['Index2']['value'] = cat['Exp_Index']
pars['Index2']['scale'] = 1.0
pars['Scale']['value'] = cat['Pivot_Energy']
pars['Cutoff']['value'] = cat['Cutoff']
pars['Prefactor'] = make_parameter_dict(pars['Prefactor'])
pars['Scale'] = make_parameter_dict(pars['Scale'], True, False)
pars['Index1'] = make_parameter_dict(pars['Index1'], False, False)
pars['Index2'] = make_parameter_dict(pars['Index2'], False, False)
pars['Cutoff'] = make_parameter_dict(pars['Cutoff'], False, True)
else:
raise Exception('Unsupported spectral type:' + spectrum_type)
return pars
[docs]class Model(object):
"""Base class for point-like and diffuse source components. This
class is a container for spectral and spatial parameters as well
as other source properties such as TS, Npred, and location within
the ROI.
"""
def __init__(self, name, data):
self._data = defaults.make_default_dict(defaults.source_output)
self._data['spectral_pars'] = get_function_defaults(
data['SpectrumType'])
self._data['spatial_pars'] = get_function_defaults(data['SpatialType'])
self._data.setdefault('catalog', data.pop('catalog', {}))
self._data.setdefault('assoc', data.pop('assoc', {}))
self._data.setdefault('class', '')
self._data['name'] = name
self._data.setdefault('psf_scale_fn', None)
self._data = utils.merge_dict(self._data, data)
self._names = [name]
catalog = self._data['catalog']
if 'CLASS1' in catalog:
self['class'] = catalog['CLASS1'].strip()
elif 'CLASS' in catalog:
self['class'] = catalog['CLASS'].strip()
for k in ROIModel.src_name_cols:
if k not in catalog:
continue
name = catalog[k].strip()
if name != '' and name not in self._names:
self._names.append(name)
self._data['assoc'][k] = name
self._sync_params()
def __contains__(self, key):
return key in self._data
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
def __eq__(self, other):
return self.name == other.name
def __str__(self):
data = copy.deepcopy(self.data)
data['names'] = self.names
output = []
output += ['{:15s}:'.format('Name') + ' {name:s}']
output += ['{:15s}:'.format('TS') + ' {ts:.2f}']
output += ['{:15s}:'.format('Npred') + ' {npred:.2f}']
output += ['{:15s}:'.format('SpatialModel') + ' {SpatialModel:s}']
output += ['{:15s}:'.format('SpectrumType') + ' {SpectrumType:s}']
output += ['Spectral Parameters']
for i, p in enumerate(self['param_names']):
if not p:
break
val = self['param_values'][i]
err = self['param_errors'][i]
output += ['{:15s}: {:10.4g} +/- {:10.4g}'.format(p, val, err)]
return '\n'.join(output).format(**data)
[docs] def items(self):
return self._data.items()
@property
def data(self):
return self._data
@property
def spectral_pars(self):
return self._data['spectral_pars']
@property
def spatial_pars(self):
return self._data['spatial_pars']
@property
def params(self):
return get_true_params_dict(self._data['spectral_pars'])
@property
def name(self):
return self._data['name']
@property
def names(self):
return self._names
@property
def assoc(self):
return self._data['assoc']
@property
def psf_scale_fn(self):
return self._data['psf_scale']
@staticmethod
[docs] def create_from_dict(src_dict, roi_skydir=None, rescale=False):
src_dict = copy.deepcopy(src_dict)
src_dict.setdefault('SpatialModel', 'PointSource')
src_dict.setdefault('SpatialType',
get_spatial_type(src_dict['SpatialModel']))
# Need this to handle old conventions for
# MapCubeFunction/ConstantValue sources
if src_dict['SpatialModel'] == 'DiffuseSource':
src_dict['SpatialModel'] = src_dict['SpatialType']
if 'filefunction' in src_dict:
src_dict['Spectrum_Filename'] = src_dict.pop('filefunction')
if 'mapcube' in src_dict:
src_dict['Spatial_Filename'] = src_dict.pop('mapcube')
if 'spectral_pars' in src_dict:
src_dict['spectral_pars'] = cast_pars_dict(
src_dict['spectral_pars'])
if 'spatial_pars' in src_dict:
src_dict['spatial_pars'] = cast_pars_dict(src_dict['spatial_pars'])
if src_dict['SpatialModel'] == 'ConstantValue':
return IsoSource(src_dict['name'], src_dict)
elif src_dict['SpatialModel'] == 'CompositeSource':
return CompositeSource(src_dict['name'], src_dict)
elif src_dict['SpatialModel'] == 'MapCubeFunction':
return MapCubeSource(src_dict['name'], src_dict)
else:
return Source.create_from_dict(src_dict, roi_skydir,
rescale=rescale)
def _sync_params(self):
pars = model_utils.pars_dict_to_vectors(self['SpectrumType'],
self.spectral_pars)
self._data.update(pars)
[docs] def get_norm(self):
par_name = get_function_norm_par_name(self['SpectrumType'])
val = self.spectral_pars[par_name]['value']
scale = self.spectral_pars[par_name]['scale']
return float(val) * float(scale)
[docs] def add_to_table(self, tab):
row_dict = {}
row_dict['Source_Name'] = self['name']
row_dict['RAJ2000'] = self['ra']
row_dict['DEJ2000'] = self['dec']
row_dict['GLON'] = self['glon']
row_dict['GLAT'] = self['glat']
if not 'param_names' in self.data:
pars = model_utils.pars_dict_to_vectors(self['SpectrumType'],
self.spectral_pars)
row_dict.update(pars)
r68_semimajor = self['pos_sigma_semimajor'] * \
self['pos_r68'] / self['pos_sigma']
r68_semiminor = self['pos_sigma_semiminor'] * \
self['pos_r68'] / self['pos_sigma']
r95_semimajor = self['pos_sigma_semimajor'] * \
self['pos_r95'] / self['pos_sigma']
r95_semiminor = self['pos_sigma_semiminor'] * \
self['pos_r95'] / self['pos_sigma']
row_dict['Conf_68_PosAng'] = self['pos_angle']
row_dict['Conf_68_SemiMajor'] = r68_semimajor
row_dict['Conf_68_SemiMinor'] = r68_semiminor
row_dict['Conf_95_PosAng'] = self['pos_angle']
row_dict['Conf_95_SemiMajor'] = r95_semimajor
row_dict['Conf_95_SemiMinor'] = r95_semiminor
row_dict.update(self.get_catalog_dict())
for t in self.data.keys():
if t == 'params':
continue
if t in tab.columns:
row_dict[t] = self[t]
row = [row_dict[k] for k in tab.columns]
tab.add_row(row)
[docs] def get_catalog_dict(self):
o = {'Spectral_Index': np.nan,
'Flux_Density': np.nan,
'Pivot_Energy': np.nan,
'beta': np.nan,
'Exp_Index': np.nan,
'Cutoff': np.nan}
params = get_true_params_dict(self.spectral_pars)
if self['SpectrumType'] == 'PowerLaw':
o['Spectral_Index'] = -1.0 * params['Index']['value']
o['Flux_Density'] = params['Prefactor']['value']
o['Pivot_Energy'] = params['Scale']['value']
elif self['SpectrumType'] == 'LogParabola':
o['Spectral_Index'] = params['alpha']['value']
o['Flux_Density'] = params['norm']['value']
o['Pivot_Energy'] = params['Eb']['value']
o['beta'] = params['beta']['value']
elif self['SpectrumType'] == 'PLSuperExpCutoff':
o['Spectral_Index'] = -1.0 * params['Index1']['value']
o['Exp_Index'] = params['Index2']['value']
o['Flux_Density'] = params['Prefactor']['value']
o['Pivot_Energy'] = params['Scale']['value']
o['Cutoff'] = params['Cutoff']['value']
return o
[docs] def check_cuts(self, cuts):
if cuts is None:
return True
if isinstance(cuts, tuple):
cuts = {cuts[0]: (cuts[1], cuts[2])}
elif isinstance(cuts, list):
cuts = {c[0]: (c[1], c[2]) for c in cuts}
for k, v in cuts.items():
# if not isinstance(c,tuple) or len(c) != 3:
# raise Exception('Wrong format for cuts tuple.')
if k in self._data:
if not utils.apply_minmax_selection(self[k], v):
return False
elif 'catalog' in self._data and k in self._data['catalog']:
if not utils.apply_minmax_selection(self['catalog'][k], v):
return False
else:
return False
return True
[docs] def set_psf_scale_fn(self, fn):
self._data['psf_scale_fn'] = fn
[docs] def set_spectral_pars(self, spectral_pars):
self._data['spectral_pars'] = copy.deepcopy(spectral_pars)
self._sync_params()
[docs] def update_spectral_pars(self, spectral_pars):
self._data['spectral_pars'] = utils.merge_dict(
self.spectral_pars, spectral_pars)
self._sync_params()
[docs] def set_name(self, name, names=None):
self._data['name'] = name
if names is None:
self._names = [name]
else:
self._names = names
[docs] def add_name(self, name):
if name not in self._names:
self._names.append(name)
[docs] def update_data(self, d):
self._data = utils.merge_dict(self._data, d, add_new_keys=True)
[docs] def update_from_source(self, src):
self._data['spectral_pars'] = {}
self._data['spatial_pars'] = {}
self._data = utils.merge_dict(self.data, src.data, add_new_keys=True)
self._name = src.name
self._names = list(set(self._names + src.names))
[docs]class IsoSource(Model):
def __init__(self, name, data):
data['SpectrumType'] = 'FileFunction'
data['SpatialType'] = 'ConstantValue'
data['SpatialModel'] = 'ConstantValue'
data['SourceType'] = 'DiffuseSource'
if not 'spectral_pars' in data:
data['spectral_pars'] = {
'Normalization': {'name': 'Normalization', 'scale': 1.0,
'value': 1.0,
'min': 0.001, 'max': 1000.0,
'free': False}}
super(IsoSource, self).__init__(name, data)
self._init_spatial_pars()
@property
def filefunction(self):
return self._data['Spectrum_Filename']
@property
def diffuse(self):
return True
def _init_spatial_pars(self):
self['spatial_pars'] = {
'Value': {'name': 'Value', 'scale': '1',
'value': '1', 'min': '0', 'max': '10',
'free': '0'}}
[docs] def write_xml(self, root):
source_element = utils.create_xml_element(root, 'source',
dict(name=self.name,
type='DiffuseSource'))
filename = utils.path_to_xmlpath(self.filefunction)
spec_el = utils.create_xml_element(source_element, 'spectrum',
dict(file=filename,
type='FileFunction',
ctype='-1'))
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type='ConstantValue'))
for k, v in self.spectral_pars.items():
utils.create_xml_element(spec_el, 'parameter', v)
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
[docs]class MapCubeSource(Model):
def __init__(self, name, data):
data.setdefault('SpectrumType', 'PowerLaw')
data['SpatialType'] = 'MapCubeFunction'
data['SpatialModel'] = 'MapCubeFunction'
data['SourceType'] = 'DiffuseSource'
if not 'spectral_pars' in data:
data['spectral_pars'] = {
'Prefactor': {'name': 'Prefactor', 'scale': 1.0,
'value': 1.0, 'min': 0.1, 'max': '10.0',
'free': False},
'Index': {'name': 'Index', 'scale': -1.0,
'value': 0.0, 'min': -1.0, 'max': 1.0,
'free': False},
'Scale': {'name': 'Scale', 'scale': 1.0,
'value': 1000.0,
'min': 1000.0, 'max': 1000.0,
'free': False},
}
super(MapCubeSource, self).__init__(name, data)
self._init_spatial_pars()
@property
def mapcube(self):
return self._data['Spatial_Filename']
@property
def diffuse(self):
return True
def _init_spatial_pars(self):
self['spatial_pars'] = {
'Normalization':
{'name': 'Normalization', 'scale': '1',
'value': '1', 'min': '0', 'max': '10',
'free': '0'}}
[docs] def write_xml(self, root):
source_element = utils.create_xml_element(root, 'source',
dict(name=self.name,
type='DiffuseSource'))
spec_el = utils.create_xml_element(source_element, 'spectrum',
dict(type=self.data['SpectrumType']))
filename = utils.path_to_xmlpath(self.mapcube)
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type='MapCubeFunction',
file=filename))
for k, v in self.spectral_pars.items():
utils.create_xml_element(spec_el, 'parameter', v)
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
[docs]class Source(Model):
"""Class representation of a source (non-diffuse) model component.
A source object serves as a container for the properties of that
source (position, spatial/spectral parameters, TS, etc.) as
derived in the current analysis. Most properties of a source
object can be accessed with the bracket operator:
# Return the TS of this source
>>> print src['ts']
# Get a skycoord representation of the source position
>>> print src.skydir
"""
def __init__(self, name, data, radec=None):
data.setdefault('SpatialModel', 'PointSource')
data.setdefault('SpectrumType', 'PowerLaw')
data.setdefault(
'SpatialType', model_utils.get_spatial_type(data['SpatialModel']))
data.setdefault(
'SourceType', model_utils.get_source_type(data['SpatialType']))
super(Source, self).__init__(name, data)
catalog = self.data.get('catalog', {})
if radec is not None:
self._set_radec(radec)
elif 'ra' in self.data and 'dec' in self.data:
self._set_radec([self.data['ra'], self.data['dec']])
elif 'RAJ2000' in catalog and 'DEJ2000' in catalog:
self._set_radec([catalog['RAJ2000'], catalog['DEJ2000']])
else:
raise Exception('Failed to infer RADEC for source: %s' % name)
self._init_spatial_pars(SpatialWidth=self['SpatialWidth'])
def __str__(self):
data = copy.deepcopy(self.data)
data['names'] = self.names
output = []
output += ['{:15s}:'.format('Name') + ' {name:s}']
output += ['{:15s}:'.format('Associations') + ' {names:s}']
output += ['{:15s}:'.format('RA/DEC') + ' {ra:10.3f}/{dec:10.3f}']
output += ['{:15s}:'.format('GLON/GLAT') +
' {glon:10.3f}/{glat:10.3f}']
output += ['{:15s}:'.format('TS') + ' {ts:.2f}']
output += ['{:15s}:'.format('Npred') + ' {npred:.2f}']
output += ['{:15s}:'.format('Flux') +
' {flux:9.4g} +/- {flux_err:8.3g}']
output += ['{:15s}:'.format('EnergyFlux') +
' {eflux:9.4g} +/- {eflux_err:8.3g}']
output += ['{:15s}:'.format('SpatialModel') + ' {SpatialModel:s}']
output += ['{:15s}:'.format('SpectrumType') + ' {SpectrumType:s}']
output += ['Spectral Parameters']
for i, p in enumerate(self['param_names']):
if not p:
break
val = self['param_values'][i]
err = self['param_errors'][i]
output += ['{:15s}: {:10.4g} +/- {:10.4g}'.format(p, val, err)]
return '\n'.join(output).format(**data)
def _set_radec(self, radec):
self['radec'] = np.array(radec, ndmin=1)
self['RAJ2000'] = radec[0]
self['DEJ2000'] = radec[1]
self['ra'] = radec[0]
self['dec'] = radec[1]
glonlat = utils.eq2gal(radec[0], radec[1])
self['glon'], self['glat'] = glonlat[0][0], glonlat[1][0]
if 'RA' in self.spatial_pars:
self.spatial_pars['RA']['value'] = radec[0]
self.spatial_pars['DEC']['value'] = radec[1]
def _set_spatial_width(self, spatial_width):
self.data['SpatialWidth'] = spatial_width
if self['SpatialType'] in ['RadialGaussian']:
self.spatial_pars['Sigma'][
'value'] = spatial_width / 1.5095921854516636
elif self['SpatialType'] in ['RadialDisk']:
self.spatial_pars['Radius'][
'value'] = spatial_width / 0.8246211251235321
def _init_spatial_pars(self, **kwargs):
spatial_pars = copy.deepcopy(kwargs)
spatial_width = spatial_pars.pop('SpatialWidth', None)
if self['SpatialType'] == 'SkyDirFunction':
self._extended = False
self._data['SourceType'] = 'PointSource'
else:
self._extended = True
self._data['SourceType'] = 'DiffuseSource'
spatial_pars.setdefault('RA', spatial_pars.pop('ra', self['ra']))
spatial_pars.setdefault('DEC', spatial_pars.pop('dec', self['dec']))
for k, v in spatial_pars.items():
if not isinstance(v, dict):
spatial_pars[k] = {'name': k, 'value': v}
if k in self.spatial_pars:
self.spatial_pars[k].update(spatial_pars[k])
if spatial_width is not None:
self._set_spatial_width(spatial_width)
elif self['SpatialType'] == 'RadialDisk':
self['SpatialWidth'] = self.spatial_pars[
'Radius']['value'] * 0.8246211251235321
elif self['SpatialType'] == 'RadialGaussian':
self['SpatialWidth'] = self.spatial_pars[
'Sigma']['value'] * 1.5095921854516636
if 'RA' in spatial_pars or 'DEC' in spatial_pars:
self._set_radec([spatial_pars['RA']['value'],
spatial_pars['DEC']['value']])
[docs] def update_data(self, d):
self._data = utils.merge_dict(self._data, d, add_new_keys=True)
if 'ra' in d and 'dec' in d:
self._set_radec([d['ra'], d['dec']])
[docs] def set_radec(self, ra, dec):
self._set_radec(np.array([ra, dec]))
[docs] def set_position(self, skydir):
"""
Set the position of the source.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
"""
if not isinstance(skydir, SkyCoord):
skydir = SkyCoord(ra=skydir[0], dec=skydir[1], unit=u.deg)
if not skydir.isscalar:
skydir = np.ravel(skydir)[0]
radec = np.array([skydir.icrs.ra.deg, skydir.icrs.dec.deg])
self._set_radec(radec)
[docs] def set_roi_direction(self, roidir):
offset = roidir.separation(self.skydir).deg
offset_cel = wcs_utils.sky_to_offset(
roidir, self['ra'], self['dec'], 'CEL')
offset_gal = wcs_utils.sky_to_offset(
roidir, self['glon'], self['glat'], 'GAL')
self['offset'] = offset
self['offset_ra'] = offset_cel[0, 0]
self['offset_dec'] = offset_cel[0, 1]
self['offset_glon'] = offset_gal[0, 0]
self['offset_glat'] = offset_gal[0, 1]
[docs] def set_roi_projection(self, proj):
if proj is None:
return
self['offset_roi_edge'] = proj.distance_to_edge(self.skydir)
[docs] def set_spatial_model(self, spatial_model, spatial_pars):
update_pars = False
if spatial_model != self['SpatialModel']:
update_pars = True
self._data['SpatialModel'] = spatial_model
self._data['SpatialType'] = get_spatial_type(self['SpatialModel'])
if update_pars:
self._data['spatial_pars'] = get_function_defaults(
self['SpatialType'])
if spatial_model == 'PointSource':
self._data['SpatialWidth'] = None
self._init_spatial_pars(**spatial_pars)
[docs] def separation(self, src):
if isinstance(src, Source):
return self.radec.separation(src.skydir)
else:
return self.radec.separation(src)
@property
def diffuse(self):
return False
@property
def extended(self):
return self._extended
@property
def associations(self):
return self._names
@property
def radec(self):
return self['radec']
@property
def skydir(self):
"""Return a SkyCoord representation of the source position.
Returns
-------
skydir : `~astropy.coordinates.SkyCoord`
"""
return SkyCoord(self.radec[0] * u.deg, self.radec[1] * u.deg)
@property
def data(self):
return self._data
@staticmethod
[docs] def create_from_dict(src_dict, roi_skydir=None, rescale=False):
"""Create a source object from a python dictionary.
Parameters
----------
src_dict : dict
Dictionary defining the properties of the source.
"""
src_dict = copy.deepcopy(src_dict)
src_dict.setdefault('SpatialModel', 'PointSource')
src_dict.setdefault('Spectrum_Filename', None)
src_dict.setdefault('SpectrumType', 'PowerLaw')
src_dict['SpatialType'] = get_spatial_type(src_dict['SpatialModel'])
spectrum_type = src_dict['SpectrumType']
spatial_type = src_dict['SpatialType']
spectral_pars = src_dict.pop('spectral_pars', {})
spatial_pars = src_dict.pop('spatial_pars', {})
if not spectral_pars:
spectral_pars = extract_pars_from_dict(spectrum_type, src_dict)
norm_par_name = get_function_norm_par_name(spectrum_type)
if norm_par_name is not None:
spectral_pars[norm_par_name].setdefault('free', True)
if not spatial_pars:
spatial_pars = extract_pars_from_dict(spatial_type, src_dict)
for k in ['RA', 'DEC', 'Prefactor']:
if k in spatial_pars:
del spatial_pars[k]
spectral_pars = create_pars_from_dict(spectrum_type, spectral_pars,
rescale)
spatial_pars = create_pars_from_dict(spatial_type, spatial_pars,
False)
if 'file' in src_dict:
src_dict['Spectrum_Filename'] = src_dict.pop('file')
if spectrum_type == 'DMFitFunction' and src_dict['Spectrum_Filename'] is None:
src_dict['Spectrum_Filename'] = os.path.join('$FERMIPY_DATA_DIR',
'gammamc_dif.dat')
src_dict['spectral_pars'] = cast_pars_dict(spectral_pars)
src_dict['spatial_pars'] = cast_pars_dict(spatial_pars)
if 'name' in src_dict:
name = src_dict['name']
src_dict['Source_Name'] = src_dict.pop('name')
elif 'Source_Name' in src_dict:
name = src_dict['Source_Name']
else:
raise Exception('Source name undefined.')
skydir = wcs_utils.get_target_skydir(src_dict, roi_skydir)
src_dict['RAJ2000'] = skydir.ra.deg
src_dict['DEJ2000'] = skydir.dec.deg
radec = np.array([skydir.ra.deg, skydir.dec.deg])
return Source(name, src_dict, radec=radec)
@staticmethod
[docs] def create_from_xmlfile(xmlfile, extdir=None):
"""Create a Source object from an XML file.
Parameters
----------
xmlfile : str
Path to XML file.
extdir : str
Path to the extended source archive.
"""
root = ElementTree.ElementTree(file=xmlfile).getroot()
srcs = root.findall('source')
if len(srcs) == 0:
raise Exception('No sources found.')
return Source.create_from_xml(srcs[0], extdir=extdir)
@staticmethod
[docs] def create_from_xml(root, extdir=None):
"""Create a Source object from an XML node.
Parameters
----------
root : `~xml.etree.ElementTree.Element`
XML node containing the source.
extdir : str
Path to the extended source archive.
"""
src_type = root.attrib['type']
spec = utils.load_xml_elements(root, 'spectrum')
spectral_pars = utils.load_xml_elements(root, 'spectrum/parameter')
spectral_type = spec['type']
spectral_pars = cast_pars_dict(spectral_pars)
spat = {}
spatial_pars = {}
nested_sources = []
if src_type == 'CompositeSource':
spatial_type = 'CompositeSource'
source_library = root.findall('source_library')[0]
for node in source_library.findall('source'):
nested_sources += [Source.create_from_xml(node, extdir=extdir)]
else:
spat = utils.load_xml_elements(root, 'spatialModel')
spatial_pars = utils.load_xml_elements(
root, 'spatialModel/parameter')
spatial_pars = cast_pars_dict(spatial_pars)
spatial_type = spat['type']
xml_dict = copy.deepcopy(root.attrib)
src_dict = {'catalog': xml_dict}
src_dict['Source_Name'] = xml_dict['name']
src_dict['SpectrumType'] = spectral_type
src_dict['SpatialType'] = spatial_type
src_dict['SourceType'] = src_type
src_dict['Spatial_Filename'] = None
src_dict['Spectrum_Filename'] = None
if 'file' in spat:
src_dict['Spatial_Filename'] = utils.xmlpath_to_path(spat['file'])
if not os.path.isfile(src_dict['Spatial_Filename']) \
and extdir is not None:
src_dict['Spatial_Filename'] = \
os.path.join(extdir, 'Templates',
src_dict['Spatial_Filename'])
if 'file' in spec:
src_dict['Spectrum_Filename'] = utils.xmlpath_to_path(spec['file'])
if src_type == 'PointSource':
src_dict['SpatialModel'] = 'PointSource'
elif src_type == 'CompositeSource':
src_dict['SpatialModel'] = 'CompositeSource'
elif spatial_type == 'SpatialMap':
src_dict['SpatialModel'] = 'SpatialMap'
else:
src_dict['SpatialModel'] = spatial_type
if src_type == 'PointSource' or \
spatial_type in ['SpatialMap', 'RadialGaussian', 'RadialDisk']:
if 'RA' in xml_dict:
src_dict['RAJ2000'] = float(xml_dict['RA'])
src_dict['DEJ2000'] = float(xml_dict['DEC'])
elif 'RA' in spatial_pars:
src_dict['RAJ2000'] = float(spatial_pars['RA']['value'])
src_dict['DEJ2000'] = float(spatial_pars['DEC']['value'])
else:
skydir = wcs_utils.get_map_skydir(os.path.expandvars(
src_dict['Spatial_Filename']))
src_dict['RAJ2000'] = skydir.ra.deg
src_dict['DEJ2000'] = skydir.dec.deg
radec = np.array([src_dict['RAJ2000'], src_dict['DEJ2000']])
src_dict['spectral_pars'] = spectral_pars
src_dict['spatial_pars'] = spatial_pars
return Source(src_dict['Source_Name'],
src_dict, radec=radec)
elif src_type == 'DiffuseSource' and spatial_type == 'ConstantValue':
return IsoSource(src_dict['Source_Name'],
{'Spectrum_Filename': spec['file'],
'spectral_pars': spectral_pars,
'spatial_pars': spatial_pars})
elif src_type == 'DiffuseSource' and spatial_type == 'MapCubeFunction':
return MapCubeSource(src_dict['Source_Name'],
{'Spatial_Filename': spat['file'],
'SpectrumType': spectral_type,
'spectral_pars': spectral_pars,
'spatial_pars': spatial_pars})
elif src_type == 'CompositeSource':
return CompositeSource(src_dict['Source_Name'],
{'SpectrumType': spectral_type,
'nested_sources': nested_sources})
else:
raise Exception(
'Unrecognized type for source: %s %s' % (src_dict['Source_Name'], src_type))
[docs] def write_xml(self, root):
"""Write this source to an XML node."""
if not self.extended:
source_element = utils.create_xml_element(root, 'source',
dict(name=self[
'Source_Name'],
type='PointSource'))
spat_el = ElementTree.SubElement(source_element, 'spatialModel')
spat_el.set('type', 'SkyDirFunction')
elif self['SpatialType'] == 'SpatialMap':
source_element = utils.create_xml_element(root, 'source',
dict(name=self[
'Source_Name'],
type='DiffuseSource'))
filename = utils.path_to_xmlpath(self['Spatial_Filename'])
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(map_based_integral='True',
type='SpatialMap',
file=filename))
else:
source_element = utils.create_xml_element(root, 'source',
dict(name=self['Source_Name'],
type='DiffuseSource'))
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type=self['SpatialType']))
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
el = ElementTree.SubElement(source_element, 'spectrum')
stype = self['SpectrumType'].strip()
el.set('type', stype)
if self['Spectrum_Filename'] is not None:
filename = utils.path_to_xmlpath(self['Spectrum_Filename'])
el.set('file', filename)
for k, v in self.spectral_pars.items():
utils.create_xml_element(el, 'parameter', v)
[docs]class CompositeSource(Model):
def __init__(self, name, data):
data.setdefault('SpectrumType', 'ConstantValue')
data['SpatialType'] = 'CompositeSource'
data['SpatialModel'] = 'CompositeSource'
data['SourceType'] = 'CompositeSource'
if not 'spectral_pars' in data:
data['spectral_pars'] = {
'Value': {'name': 'Value', 'scale': 1.0,
'value': 1.0, 'min': 0.1, 'max': '10.0',
'free': False},
}
super(CompositeSource, self).__init__(name, data)
self._build_nested_sources(data)
@property
def nested_sources(self):
return self._nested_sources
@property
def diffuse(self):
return True
def _build_nested_sources(self, data):
self._nested_sources = []
for nested_source in data.get('nested_sources', []):
if isinstance(nested_source, Model):
self._nested_sources.append(copy.deepcopy(nested_source))
elif isinstance(nested_source, dict):
self._nested_sources.append(
Source.create_from_dict(nested_source))
[docs] def write_xml(self, root):
source_element = utils.create_xml_element(root, 'source',
dict(name=self.name,
type='CompositeSource'))
spec_el = utils.create_xml_element(source_element, 'spectrum',
dict(type=self.data['SpectrumType']))
for k, v in self.spectral_pars.items():
utils.create_xml_element(spec_el, 'parameter', v)
spat_el = utils.create_xml_element(
source_element, 'source_library', dict(title=self.name))
for nested_source in self._nested_sources:
nested_source.write_xml(spat_el)
[docs]class ROIModel(fermipy.config.Configurable):
"""This class is responsible for managing the ROI model (both sources
and diffuse components). Source catalogs can be read
from either FITS or XML files. Individual components are
represented by instances of `~fermipy.roi_model.Model` and can be
accessed by name using the bracket operator.
* Create an ROI with all 3FGL sources and print a summary of its contents:
>>> skydir = astropy.coordinates.SkyCoord(0.0,0.0,unit='deg')
>>> roi = ROIModel({'catalogs' : ['3FGL'],'src_roiwidth' : 10.0},skydir=skydir)
>>> print(roi)
name SpatialModel SpectrumType offset ts npred
--------------------------------------------------------------------------------
3FGL J2357.3-0150 PointSource PowerLaw 1.956 nan 0.0
3FGL J0006.2+0135 PointSource PowerLaw 2.232 nan 0.0
3FGL J0016.3-0013 PointSource PowerLaw 4.084 nan 0.0
3FGL J0014.3-0455 PointSource PowerLaw 6.085 nan 0.0
* Print a summary of an individual source
>>> print(roi['3FGL J0006.2+0135'])
Name : 3FGL J0006.2+0135
Associations : ['3FGL J0006.2+0135']
RA/DEC : 1.572/ 1.585
GLON/GLAT : 100.400/ -59.297
TS : nan
Npred : nan
Flux : nan +/- nan
EnergyFlux : nan +/- nan
SpatialModel : PointSource
SpectrumType : PowerLaw
Spectral Parameters
Index : -2 +/- nan
Scale : 1000 +/- nan
Prefactor : 1e-12 +/- nan
* Get the SkyCoord for a source
>>> dir = roi['SourceA'].skydir
* Loop over all sources and print their names
>>> for s in roi.sources: print(s.name)
3FGL J2357.3-0150
3FGL J0006.2+0135
3FGL J0016.3-0013
3FGL J0014.3-0455
"""
defaults = dict(defaults.model.items(),
fileio=defaults.fileio)
src_name_cols = ['Source_Name',
'ASSOC', 'ASSOC1', 'ASSOC2', 'ASSOC_GAM',
'1FHL_Name', '2FGL_Name', '3FGL_Name',
'ASSOC_GAM1', 'ASSOC_GAM2', 'ASSOC_TEV']
def __init__(self, config=None, **kwargs):
# Coordinate for ROI center (defaults to 0,0)
self._skydir = kwargs.pop('skydir', SkyCoord(0.0, 0.0, unit=u.deg))
self._projection = kwargs.get('projection', None)
coordsys = kwargs.pop('coordsys', 'CEL')
srcname = kwargs.pop('srcname', None)
super(ROIModel, self).__init__(config, **kwargs)
if self.config['extdir'] is not None and \
not os.path.isdir(os.path.expandvars(self.config['extdir'])):
self._config['extdir'] = \
os.path.join('$FERMIPY_DATA_DIR',
'catalogs', self.config['extdir'])
self._src_radius = self.config['src_radius']
if self.config['src_roiwidth'] is not None:
self._config['src_radius_roi'] = self.config['src_roiwidth'] * 0.5
self._srcs = []
self._diffuse_srcs = []
self._src_dict = collections.defaultdict(list)
self._src_radius = []
self.load(coordsys=coordsys, srcname=srcname)
# def __getstate__(self):
# d = self.__dict__.copy()
# if 'logger' in d.keys():
# d['logger'] = d['logger'].name
# return d
# def __setstate__(self, d):
# if 'logger' in d.keys():
# d['logger'] = \
# Logger.get(self.__class__.__name__,
# d['_config']['logfile'],
# log_level(d['_config']['logging']['verbosity']))
#
# self.__dict__.update(d)
def __contains__(self, key):
key = key.replace(' ', '').lower()
return key in self._src_dict.keys()
def __getitem__(self, key):
return self.get_source_by_name(key)
def __iter__(self):
return iter(self._srcs + self._diffuse_srcs)
def __str__(self):
o = ''
o += '%-20s%-15s%-15s%8s%10s%12s\n' % (
'name', 'SpatialModel', 'SpectrumType', 'offset',
'ts', 'npred')
o += '-' * 80 + '\n'
for s in sorted(self.sources, key=lambda t: t['offset']):
if s.diffuse:
continue
o += '%-20.19s%-15.14s%-15.14s%8.3f%10.2f%12.1f\n' % (
s['name'], s['SpatialModel'],
s['SpectrumType'],
s['offset'], s['ts'], s['npred'])
for s in sorted(self.sources, key=lambda t: t['offset']):
if not s.diffuse:
continue
o += '%-20.19s%-15.14s%-15.14s%8s%10.2f%12.1f\n' % (
s['name'], s['SpatialModel'],
s['SpectrumType'],
'-----', s['ts'], s['npred'])
return o
@property
def skydir(self):
"""Return the sky direction corresponding to the center of the
ROI."""
return self._skydir
@property
def projection(self):
return self._projection
@property
def sources(self):
return self._srcs + self._diffuse_srcs
@property
def point_sources(self):
return self._srcs
@property
def diffuse_sources(self):
return self._diffuse_srcs
[docs] def set_projection(self, proj):
self._projection = proj
for s in self._srcs:
s.set_roi_projection(proj)
[docs] def clear(self):
"""Clear the contents of the ROI."""
self._srcs = []
self._diffuse_srcs = []
self._src_dict = collections.defaultdict(list)
self._src_radius = []
[docs] def load_diffuse_srcs(self):
self._load_diffuse_src('isodiff')
self._load_diffuse_src('galdiff')
self._load_diffuse_src('limbdiff')
self._load_diffuse_src('diffuse')
def _load_diffuse_src(self, name, src_type='FileFunction'):
if 'FERMI_DIR' in os.environ and 'FERMI_DIFFUSE_DIR' not in os.environ:
os.environ['FERMI_DIFFUSE_DIR'] = \
os.path.expandvars('$FERMI_DIR/refdata/fermi/galdiffuse')
search_dirs = []
search_dirs += self.config['diffuse_dir']
search_dirs += [self.config['fileio']['outdir'],
os.path.join('$FERMIPY_ROOT', 'data'),
'$FERMI_DIFFUSE_DIR']
srcs = []
if self.config[name] is not None:
srcs = self.config[name]
for i, t in enumerate(srcs):
if utils.isstr(t):
src_dict = {'file': t}
elif isinstance(t, dict):
src_dict = copy.deepcopy(t)
else:
raise Exception(
'Invalid type in diffuse mode list: %s' % str(type(t)))
src_dict['file'] = \
utils.resolve_file_path(src_dict['file'],
search_dirs=search_dirs)
if 'name' not in src_dict:
if len(srcs) == 1:
src_dict['name'] = name
else:
src_dict['name'] = name + '%02i' % i
if re.search(r'(\.txt$)', src_dict['file']):
src_type = 'FileFunction'
elif re.search(r'(\.fits$|\.fit$|\.fits.gz$|\.fit.gz$)',
src_dict['file']):
src_type = 'MapCubeFunction'
else:
raise Exception(
'Unrecognized file format for diffuse model: %s' % src_dict[
'file'])
# Extract here
if src_type == 'FileFunction':
src = IsoSource(src_dict['name'], {
'Spectrum_Filename': src_dict['file']})
altname = os.path.basename(src_dict['file'])
altname = re.sub(r'(\.txt$)', '', altname)
else:
src = MapCubeSource(src_dict['name'], {
'Spatial_Filename': src_dict['file']})
altname = os.path.basename(src_dict['file'])
altname = re.sub(r'(\.fits$|\.fit$|\.fits.gz$|\.fit.gz$)',
'', altname)
src.add_name(altname)
self.load_source(src, False, self.config['merge_sources'])
[docs] def create_source(self, name, src_dict, build_index=True,
merge_sources=True, rescale=True):
"""Add a new source to the ROI model from a dictionary or an
existing source object.
Parameters
----------
name : str
src_dict : dict or `~fermipy.roi_model.Source`
Returns
-------
src : `~fermipy.roi_model.Source`
"""
src_dict = copy.deepcopy(src_dict)
if isinstance(src_dict, dict):
src_dict['name'] = name
src = Model.create_from_dict(src_dict, self.skydir,
rescale=rescale)
else:
src = src_dict
src.set_name(name)
if isinstance(src, Source):
src.set_roi_direction(self.skydir)
src.set_roi_projection(self.projection)
self.load_source(src, build_index=build_index,
merge_sources=merge_sources)
return self.get_source_by_name(name)
[docs] def copy_source(self, name):
src = self.get_source_by_name(name)
return copy.deepcopy(src)
[docs] def load_sources(self, sources):
"""Delete all sources in the ROI and load the input source list."""
self.clear()
for s in sources:
if isinstance(s, dict):
s = Model.create_from_dict(s)
self.load_source(s, build_index=False)
self._build_src_index()
def _add_source_alias(self, name, src):
if src not in self._src_dict[name]:
self._src_dict[name] += [src]
[docs] def load_source(self, src, build_index=True, merge_sources=True,
**kwargs):
"""
Load a single source.
Parameters
----------
src : `~fermipy.roi_model.Source`
Source object that will be added to the ROI.
merge_sources : bool
When a source matches an existing source in the model
update that source with the properties of the new source.
build_index : bool
Re-make the source index after loading this source.
"""
src = copy.deepcopy(src)
name = src.name.replace(' ', '').lower()
min_sep = kwargs.get('min_separation', None)
if min_sep is not None:
sep = src.skydir.separation(self._src_skydir).deg
if len(sep) > 0 and np.min(sep) < min_sep:
return
match_srcs = self.match_source(src)
if len(match_srcs) == 1:
# self.logger.debug('Found matching source for %s : %s',
# src.name, match_srcs[0].name)
if merge_sources:
match_srcs[0].update_from_source(src)
else:
match_srcs[0].add_name(src.name)
self._add_source_alias(src.name.replace(' ', '').lower(),
match_srcs[0])
return
elif len(match_srcs) > 2:
raise Exception('Multiple sources with name %s' % name)
self._add_source_alias(src.name, src)
for name in src.names:
self._add_source_alias(name.replace(' ', '').lower(), src)
if isinstance(src, Source):
self._srcs.append(src)
else:
self._diffuse_srcs.append(src)
if build_index:
self._build_src_index()
[docs] def match_source(self, src):
"""Look for source or sources in the model that match the
given source. Sources are matched by name and any association
columns defined in the assoc_xmatch_columns parameter.
"""
srcs = []
names = [src.name]
for col in self.config['assoc_xmatch_columns']:
if col in src.assoc and src.assoc[col]:
names += [src.assoc[col]]
for name in names:
name = name.replace(' ', '').lower()
if name not in self._src_dict:
continue
srcs += [s for s in self._src_dict[name] if s not in srcs]
return srcs
[docs] def load(self, **kwargs):
"""Load both point source and diffuse components."""
coordsys = kwargs.get('coordsys', 'CEL')
extdir = kwargs.get('extdir', self.config['extdir'])
srcname = kwargs.get('srcname', None)
self.clear()
self.load_diffuse_srcs()
for c in self.config['catalogs']:
if isinstance(c, catalog.Catalog):
self.load_existing_catalog(c)
continue
extname = os.path.splitext(c)[1]
if extname != '.xml':
self.load_fits_catalog(c, extdir=extdir, coordsys=coordsys,
srcname=srcname)
elif extname == '.xml':
self.load_xml(c, extdir=extdir, coordsys=coordsys)
else:
raise Exception('Unrecognized catalog file extension: %s' % c)
for c in self.config['sources']:
if 'name' not in c:
raise Exception(
'No name field in source dictionary:\n ' + str(c))
self.create_source(c['name'], c, build_index=False)
self._build_src_index()
[docs] def delete_sources(self, srcs):
for k, v in self._src_dict.items():
for s in srcs:
if s in v:
self._src_dict[k].remove(s)
if not v:
del self._src_dict[k]
self._srcs = [s for s in self._srcs if s not in srcs]
self._diffuse_srcs = [s for s in self._diffuse_srcs if s not in srcs]
self._build_src_index()
@staticmethod
[docs] def create_from_roi_data(datafile):
"""Create an ROI model."""
data = np.load(datafile).flat[0]
roi = ROIModel()
roi.load_sources(data['sources'].values())
return roi
@staticmethod
[docs] def create(selection, config, **kwargs):
"""Create an ROIModel instance."""
if selection['target'] is not None:
return ROIModel.create_from_source(selection['target'],
config, **kwargs)
else:
target_skydir = wcs_utils.get_target_skydir(selection)
return ROIModel.create_from_position(target_skydir,
config, **kwargs)
@staticmethod
[docs] def create_from_position(skydir, config, **kwargs):
"""Create an ROIModel instance centered on a sky direction.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
Sky direction on which the ROI will be centered.
config : dict
Model configuration dictionary.
"""
coordsys = kwargs.pop('coordsys', 'CEL')
roi = ROIModel(config, skydir=skydir, coordsys=coordsys, **kwargs)
return roi
@staticmethod
[docs] def create_from_source(name, config, **kwargs):
"""Create an ROI centered on the given source."""
coordsys = kwargs.pop('coordsys', 'CEL')
roi = ROIModel(config, src_radius=None, src_roiwidth=None,
srcname=name, **kwargs)
src = roi.get_source_by_name(name)
return ROIModel.create_from_position(src.skydir, config,
coordsys=coordsys, **kwargs)
@staticmethod
[docs] def create_roi_from_ft1(ft1file, config):
"""Create an ROI model by extracting the sources coordinates
form an FT1 file."""
pass
[docs] def has_source(self, name):
index_name = name.replace(' ', '').lower()
if index_name in self._src_dict:
return True
else:
return False
[docs] def get_source_by_name(self, name):
"""Return a single source in the ROI with the given name. The
input name string can match any of the strings in the names
property of the source object. Case and whitespace are
ignored when matching name strings. If no sources are found
or multiple sources then an exception is thrown.
Parameters
----------
name : str
Name string.
Returns
-------
srcs : `~fermipy.roi_model.Model`
A source object.
"""
srcs = self.get_sources_by_name(name)
if len(srcs) == 1:
return srcs[0]
elif len(srcs) == 0:
raise Exception('No source matching name: ' + name)
elif len(srcs) > 1:
raise Exception('Multiple sources matching name: ' + name)
[docs] def get_sources_by_name(self, name):
"""Return a list of sources in the ROI matching the given
name. The input name string can match any of the strings in
the names property of the source object. Case and whitespace
are ignored when matching name strings.
Parameters
----------
name : str
Returns
-------
srcs : list
A list of `~fermipy.roi_model.Model` objects.
"""
index_name = name.replace(' ', '').lower()
if index_name in self._src_dict:
return list(self._src_dict[index_name])
else:
raise Exception('No source matching name: ' + name)
[docs] def get_nearby_sources(self, name, distance, min_dist=None,
square=False):
src = self.get_source_by_name(name)
return self.get_sources_by_position(src.skydir,
distance, min_dist,
square)
[docs] def get_sources(self, skydir=None, distance=None, cuts=None,
minmax_ts=None, minmax_npred=None,
exclude=None, square=False, coordsys='CEL'):
"""Retrieve list of source objects satisfying the following
selections:
* Angular separation from ``skydir`` or ROI center (if
``skydir`` is None) less than ``distance``.
* Cuts on source properties defined in ``cuts`` list.
* TS and Npred in range specified by ``minmax_ts`` and ``minmax_npred``.
Sources can be excluded from the selection by adding their
name to the ``exclude`` list.
Returns
-------
srcs : list
List of source objects.
"""
if skydir is None:
skydir = self.skydir
if exclude is None:
exclude = []
rsrc, srcs = self.get_sources_by_position(skydir,
distance,
square=square,
coordsys=coordsys)
o = []
for s in srcs + self.diffuse_sources:
if s.name in exclude:
continue
if not s.check_cuts(cuts):
continue
ts = s['ts']
npred = s['npred']
if not utils.apply_minmax_selection(ts, minmax_ts):
continue
if not utils.apply_minmax_selection(npred, minmax_npred):
continue
o.append(s)
return o
[docs] def get_sources_by_property(self, pname, pmin, pmax=None):
srcs = []
for i, s in enumerate(self._srcs):
if pname not in s:
continue
if pmin is not None and s[pname] < pmin:
continue
if pmax is not None and s[pname] > pmax:
continue
srcs.append(s)
return srcs
[docs] def get_sources_by_position(self, skydir, dist, min_dist=None,
square=False, coordsys='CEL'):
"""Retrieve sources within a certain angular distance of a sky
coordinate. This function supports two types of geometric
selections: circular (square=False) and square (square=True).
The circular selection finds all sources with a given angular
distance of the target position. The square selection finds
sources within an ROI-like region of size R x R where R = 2 x
dist.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
Sky direction with respect to which the selection will be applied.
dist : float
Maximum distance in degrees from the sky coordinate.
square : bool
Choose whether to apply a circular or square selection.
coordsys : str
Coordinate system to use when applying a selection with square=True.
"""
msk = get_skydir_distance_mask(self._src_skydir, skydir, dist,
min_dist=min_dist, square=square,
coordsys=coordsys)
radius = self._src_skydir.separation(skydir).deg
radius = radius[msk]
srcs = [self._srcs[i] for i in np.nonzero(msk)[0]]
isort = np.argsort(radius)
radius = radius[isort]
srcs = [srcs[i] for i in isort]
return radius, srcs
[docs] def load_fits_catalog(self, name, **kwargs):
"""Load sources from a FITS catalog file.
Parameters
----------
name : str
Catalog name or path to a catalog FITS file.
"""
# EAC split this function to make it easier to load an existing catalog
cat = catalog.Catalog.create(name)
self.load_existing_catalog(cat, **kwargs)
[docs] def load_existing_catalog(self, cat, **kwargs):
"""Load sources from an existing catalog object.
Parameters
----------
cat : `~fermipy.catalog.Catalog`
Catalog object.
"""
coordsys = kwargs.get('coordsys', 'CEL')
extdir = kwargs.get('extdir', self.config['extdir'])
srcname = kwargs.get('srcname', None)
m0 = get_skydir_distance_mask(cat.skydir, self.skydir,
self.config['src_radius'])
m1 = get_skydir_distance_mask(cat.skydir, self.skydir,
self.config['src_radius_roi'],
square=True, coordsys=coordsys)
m = (m0 & m1)
if srcname is not None:
m &= utils.find_rows_by_string(cat.table, [srcname],
self.src_name_cols)
offset = self.skydir.separation(cat.skydir).deg
offset_cel = wcs_utils.sky_to_offset(self.skydir,
cat.radec[:, 0], cat.radec[:, 1],
'CEL')
offset_gal = wcs_utils.sky_to_offset(self.skydir,
cat.glonlat[
:, 0], cat.glonlat[:, 1],
'GAL')
for i, (row, radec) in enumerate(zip(cat.table[m],
cat.radec[m])):
catalog_dict = catalog.row_to_dict(row)
src_dict = {'catalog': catalog_dict}
src_dict['Source_Name'] = row['Source_Name']
src_dict['SpectrumType'] = row['SpectrumType']
if row['extended']:
src_dict['SourceType'] = 'DiffuseSource'
src_dict['SpatialType'] = 'SpatialMap'
src_dict['SpatialModel'] = 'SpatialMap'
search_dirs = []
if extdir is not None:
search_dirs += [extdir, os.path.join(extdir, 'Templates')]
search_dirs += [row['extdir'],
os.path.join(row['extdir'], 'Templates')]
src_dict['Spatial_Filename'] = utils.resolve_file_path(
row['Spatial_Filename'],
search_dirs=search_dirs)
else:
src_dict['SourceType'] = 'PointSource'
src_dict['SpatialType'] = 'SkyDirFunction'
src_dict['SpatialModel'] = 'PointSource'
src_dict['spectral_pars'] = spectral_pars_from_catalog(
catalog_dict)
src = Source(src_dict['Source_Name'], src_dict, radec=radec)
src.data['offset'] = offset[m][i]
src.data['offset_ra'] = offset_cel[:, 0][m][i]
src.data['offset_dec'] = offset_cel[:, 1][m][i]
src.data['offset_glon'] = offset_gal[:, 0][m][i]
src.data['offset_glat'] = offset_gal[:, 1][m][i]
self.load_source(src, False,
merge_sources=self.config['merge_sources'])
self._build_src_index()
[docs] def load_xml(self, xmlfile, **kwargs):
"""Load sources from an XML file."""
extdir = kwargs.get('extdir', self.config['extdir'])
coordsys = kwargs.get('coordsys', 'CEL')
if not os.path.isfile(xmlfile):
xmlfile = os.path.join(fermipy.PACKAGE_DATA, 'catalogs', xmlfile)
root = ElementTree.ElementTree(file=xmlfile).getroot()
diffuse_srcs = []
srcs = []
ra, dec = [], []
for s in root.findall('source'):
src = Source.create_from_xml(s, extdir=extdir)
if src.diffuse:
diffuse_srcs += [src]
else:
srcs += [src]
ra += [src['RAJ2000']]
dec += [src['DEJ2000']]
src_skydir = SkyCoord(ra=np.array(ra) * u.deg,
dec=np.array(dec) * u.deg)
radec = np.vstack((src_skydir.ra.deg, src_skydir.dec.deg)).T
glonlat = np.vstack((src_skydir.galactic.l.deg,
src_skydir.galactic.b.deg)).T
offset = self.skydir.separation(src_skydir).deg
offset_cel = wcs_utils.sky_to_offset(self.skydir,
radec[:, 0], radec[:, 1], 'CEL')
offset_gal = wcs_utils.sky_to_offset(self.skydir,
glonlat[:, 0], glonlat[:, 1], 'GAL')
m0 = get_skydir_distance_mask(src_skydir, self.skydir,
self.config['src_radius'])
m1 = get_skydir_distance_mask(src_skydir, self.skydir,
self.config['src_radius_roi'],
square=True, coordsys=coordsys)
m = (m0 & m1)
srcs = np.array(srcs)[m]
for i, s in enumerate(srcs):
s.data['offset'] = offset[m][i]
s.data['offset_ra'] = offset_cel[:, 0][m][i]
s.data['offset_dec'] = offset_cel[:, 1][m][i]
s.data['offset_glon'] = offset_gal[:, 0][m][i]
s.data['offset_glat'] = offset_gal[:, 1][m][i]
self.load_source(s, False,
merge_sources=self.config['merge_sources'])
for i, s in enumerate(diffuse_srcs):
self.load_source(s, False,
merge_sources=self.config['merge_sources'])
self._build_src_index()
def _build_src_index(self):
"""Build an indices for fast lookup of a source given its name
or coordinates."""
self._srcs = sorted(self._srcs, key=lambda t: t['offset'])
nsrc = len(self._srcs)
radec = np.zeros((2, nsrc))
for i, src in enumerate(self._srcs):
radec[:, i] = src.radec
self._src_skydir = SkyCoord(ra=radec[0], dec=radec[1], unit=u.deg)
self._src_radius = self._src_skydir.separation(self.skydir)
[docs] def write_xml(self, xmlfile):
"""Save the ROI model as an XML file."""
root = ElementTree.Element('source_library')
root.set('title', 'source_library')
for s in self._srcs:
s.write_xml(root)
for s in self._diffuse_srcs:
s.write_xml(root)
output_file = open(xmlfile, 'w')
output_file.write(utils.prettify_xml(root))
[docs] def create_source_table(self):
cols_dict = collections.OrderedDict()
cols_dict['source_name'] = dict(dtype='S48', format='%s')
cols_dict['spectrum_type'] = dict(dtype='S48', format='%s')
cols_dict['spatialModel_type'] = dict(dtype='S48', format='%s')
cols_dict['spectrum_file'] = dict(dtype='S256', format='%s')
cols_dict['spatialModel_file'] = dict(dtype='S256', format='%s')
cols = [Column(name=k, **v) for k, v in cols_dict.items()]
tab = Table(cols)
row_dict = {}
for s in self.sources:
row_dict['source_name'] = s.name
row_dict['spectrum_type'] = s['SpectrumType']
row_dict['spatialModel_type'] = s['SpatialType']
row_dict['spectrum_file'] = s['Spectrum_Filename']
row_dict['spatialModel_file'] = s['Spatial_Filename']
tab.add_row([row_dict[k] for k in tab.columns])
return tab
[docs] def create_param_table(self):
cols_dict = collections.OrderedDict()
cols_dict['source_name'] = dict(dtype='S48', format='%s')
cols_dict['name'] = dict(dtype='S48', format='%s')
cols_dict['group'] = dict(dtype='S48', format='%s')
cols_dict['type'] = dict(dtype='S48', format='%s')
cols_dict['value'] = dict(dtype='f8', format='%.3f')
cols_dict['error'] = dict(dtype='f8', format='%.3f')
cols_dict['scale'] = dict(dtype='f8', format='%.3f')
cols_dict['min'] = dict(dtype='f8', format='%.3f')
cols_dict['max'] = dict(dtype='f8', format='%.3f')
cols_dict['free'] = dict(dtype='bool')
cols = [Column(name=k, **v) for k, v in cols_dict.items()]
tab = Table(cols)
row_dict = {}
for s in self.sources:
row_dict['source_name'] = s.name
row_dict['type'] = s['SpectrumType']
row_dict['group'] = 'spectrum'
for k, v in s.spectral_pars.items():
row_dict['name'] = k
row_dict.update(v)
tab.add_row([row_dict[k] for k in tab.columns])
row_dict['type'] = s['SpatialType']
row_dict['group'] = 'spatialModel'
for k, v in s.spatial_pars.items():
row_dict['name'] = k
row_dict.update(v)
tab.add_row([row_dict[k] for k in tab.columns])
return tab
[docs] def create_table(self, names=None):
"""Create an astropy Table object with the contents of the ROI model.
"""
scan_shape = (1,)
for src in self._srcs:
scan_shape = max(scan_shape, src['dloglike_scan'].shape)
tab = create_source_table(scan_shape)
for s in self._srcs:
if names is not None and s.name not in names:
continue
s.add_to_table(tab)
return tab
[docs] def write_fits(self, fitsfile):
"""Write the ROI model to a FITS file."""
tab = self.create_table()
hdu_data = fits.table_to_hdu(tab)
hdus = [fits.PrimaryHDU(), hdu_data]
fits_utils.write_hdus(hdus, fitsfile)