from __future__ import absolute_import, division, print_function, \
unicode_literals
import os
import copy
import re
import collections
import numpy as np
import xml.etree.cElementTree as ElementTree
import pyLikelihood as pyLike
from astropy import units as u
from astropy.coordinates import SkyCoord
import astropy.io.fits as pyfits
from astropy.table import Table, Column
import fermipy
import fermipy.config
import fermipy.utils as utils
import fermipy.wcs_utils as wcs_utils
import fermipy.gtutils as gtutils
import fermipy.catalog as catalog
import fermipy.defaults as defaults
from fermipy.logger import Logger, log_level
[docs]def create_source_table(scan_shape):
"""Create an empty source table.
Returns
-------
tab : `~astropy.table.Table`
"""
cols_dict = collections.OrderedDict()
cols_dict['Source_Name'] = dict(dtype='S48', format='%s')
cols_dict['name'] = dict(dtype='S48', format='%s')
cols_dict['class'] = dict(dtype='S32', format='%s')
cols_dict['SpectrumType'] = dict(dtype='S32', format='%s')
cols_dict['SpatialType'] = dict(dtype='S32', format='%s')
cols_dict['SourceType'] = dict(dtype='S32', format='%s')
cols_dict['SpatialModel'] = dict(dtype='S32', format='%s')
cols_dict['RAJ2000'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['DEJ2000'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['GLON'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['GLAT'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['ts'] = dict(dtype='f8', format='%.3f')
cols_dict['loglike'] = dict(dtype='f8', format='%.3f')
cols_dict['npred'] = dict(dtype='f8', format='%.3f')
cols_dict['offset'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['offset_ra'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['offset_dec'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['offset_glon'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['offset_glat'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['offset_roi_edge'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['pivot_energy'] = dict(dtype='f8', format='%.3f',unit='MeV')
cols_dict['flux_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['eflux_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
cols_dict['dloglike_scan'] = dict(dtype='f8', format='%.3f',
shape=scan_shape)
# Add source dictionary columns
for k, v in sorted(defaults.source_output.items()):
if not k in cols_dict.keys():
if v[2] == float:
cols_dict[k] = dict(dtype='f8', format='%f')
elif v[2] == str:
cols_dict[k] = dict(dtype='S32', format='%s')
cols_dict['param_names'] = dict(dtype='S32', format='%s',shape=(6,))
cols_dict['param_values'] = dict(dtype='f8', format='%f',shape=(6,))
cols_dict['param_errors'] = dict(dtype='f8', format='%f',shape=(6,))
# Catalog Parameters
cols_dict['Flux_Density'] = dict(dtype='f8', format='%.5g',unit='1 / (MeV cm2 s)')
cols_dict['Spectral_Index'] = dict(dtype='f8', format='%.3f')
cols_dict['Pivot_Energy'] = dict(dtype='f8', format='%.3f',unit='MeV')
cols_dict['beta'] = dict(dtype='f8', format='%.3f')
cols_dict['Exp_Index'] = dict(dtype='f8', format='%.3f')
cols_dict['Cutoff'] = dict(dtype='f8', format='%.3f',unit='MeV')
cols_dict['Conf_68_PosAng'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['Conf_68_SemiMajor'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['Conf_68_SemiMinor'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['Conf_95_PosAng'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['Conf_95_SemiMajor'] = dict(dtype='f8', format='%.3f',unit='deg')
cols_dict['Conf_95_SemiMinor'] = dict(dtype='f8', format='%.3f',unit='deg')
for t in ['eflux','eflux100','eflux1000','eflux10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f',unit='MeV / (cm2 s)',shape=(2,))
for t in ['eflux_ul95','eflux100_ul95','eflux1000_ul95','eflux10000_ul95']:
cols_dict[t] = dict(dtype='f8', format='%.3f',unit='MeV / (cm2 s)')
for t in ['flux','flux100','flux1000','flux10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f',unit='1 / (cm2 s)',shape=(2,))
for t in ['flux_ul95','flux100_ul95','flux1000_ul95','flux10000_ul95']:
cols_dict[t] = dict(dtype='f8', format='%.3f',unit='1 / (cm2 s)')
for t in ['dfde','dfde100','dfde1000','dfde10000']:
cols_dict[t] = dict(dtype='f8', format='%.3f',unit='1 / (MeV cm2 s)',shape=(2,))
cols = [Column(name=k, **v) for k,v in cols_dict.items()]
tab = Table(cols)
return tab
[docs]def resolve_file_path(path, **kwargs):
dirs = kwargs.get('search_dirs', [])
if os.path.isabs(os.path.expandvars(path)) and \
os.path.isfile(os.path.expandvars(path)):
return path
for d in dirs:
if not os.path.isdir(os.path.expandvars(d)):
continue
p = os.path.join(d, path)
if os.path.isfile(os.path.expandvars(p)):
return p
raise Exception('Failed to resolve file path: %s' % path)
[docs]def get_skydir_distance_mask(src_skydir, skydir, dist, min_dist=None,
square=False, coordsys='CEL'):
"""Retrieve sources within a certain angular distance of an
(ra,dec) coordinate. This function supports two types of
geometric selections: circular (square=False) and square
(square=True). The circular selection finds all sources with a given
angular distance of the target position. The square selection
finds sources within an ROI-like region of size R x R where R
= 2 x dist.
Parameters
----------
src_skydir : `~astropy.coordinates.SkyCoord`
Array of sky directions.
skydir : `~astropy.coordinates.SkyCoord`
Sky direction with respect to which the selection will be applied.
dist : float
Maximum distance in degrees from the sky coordinate.
square : bool
Choose whether to apply a circular or square selection.
coordsys : str
Coordinate system to use when applying a selection with square=True.
"""
if dist is None: dist = 180.
if not square:
dtheta = src_skydir.separation(skydir).rad
elif coordsys == 'CEL':
dtheta = get_linear_dist(skydir,
src_skydir.ra.rad,
src_skydir.dec.rad,
coordsys=coordsys)
elif coordsys == 'GAL':
dtheta = get_linear_dist(skydir,
src_skydir.galactic.l.rad,
src_skydir.galactic.b.rad,
coordsys=coordsys)
else:
raise Exception('Unrecognized coordinate system: %s' % coordsys)
msk = (dtheta < np.radians(dist))
if min_dist is not None: msk &= (dtheta > np.radians(min_dist))
return msk
[docs]def get_linear_dist(skydir, lon, lat, coordsys='CEL'):
xy = wcs_utils.sky_to_offset(skydir, np.degrees(lon), np.degrees(lat),
coordsys=coordsys)
x = np.radians(xy[:, 0])
y = np.radians(xy[:, 1])
delta = np.array([np.abs(x), np.abs(y)])
dtheta = np.max(delta, axis=0)
return dtheta
[docs]def get_dist_to_edge(skydir, lon, lat, width, coordsys='CEL'):
xy = wcs_utils.sky_to_offset(skydir, np.degrees(lon), np.degrees(lat),
coordsys=coordsys)
x = np.radians(xy[:, 0])
y = np.radians(xy[:, 1])
delta_edge = np.array([np.abs(x) - width, np.abs(y) - width])
dtheta = np.max(delta_edge, axis=0)
return dtheta
[docs]def get_params_dict(pars_dict):
params = {}
for k, p in pars_dict.items():
val = p['value']*p['scale']
err = np.nan
if 'error' in p:
err = p['error']*np.abs(p['scale'])
params[k]=np.array([val,err])
return params
[docs]class Model(object):
"""Base class for source objects. This class is a container for both
spectral and spatial parameters as well as other source properties
such as TS, Npred, and location within the ROI.
"""
def __init__(self, name, data=None):
self._data = defaults.make_default_dict(defaults.source_output)
self._data.setdefault('spectral_pars', {})
self._data.setdefault('spatial_pars', {})
self._data.setdefault('catalog', {})
self._data['assoc'] = {}
self._data['name'] = name
if data is not None:
self._data.update(data)
if not self.spectral_pars:
pdict = gtutils.get_function_defaults(self['SpectrumType'])
self._data['spectral_pars'] = pdict
for k, v in self.spectral_pars.items():
self._data['spectral_pars'][k] = gtutils.make_parameter_dict(v)
self._names = [name]
catalog = self._data['catalog']
for k in ROIModel.src_name_cols:
if k not in catalog:
continue
name = catalog[k].strip()
if name != '' and name not in self._names:
self._names.append(name)
self._data['assoc'][k] = name
if self.params:
self._sync_spectral_pars()
else:
self._sync_params()
def __contains__(self, key):
return key in self._data
def __getitem__(self, key):
return self._data[key]
def __setitem__(self, key, value):
self._data[key] = value
def __eq__(self, other):
return self.name == other.name
def __str__(self):
data = copy.deepcopy(self.data)
data['names'] = self.names
output = []
output += ['{:15s}:'.format('Name') + ' {name:s}']
output += ['{:15s}:'.format('TS') + ' {ts:.2f}']
output += ['{:15s}:'.format('Npred') + ' {npred:.2f}']
output += ['{:15s}:'.format('SpatialModel') + ' {SpatialModel:s}']
output += ['{:15s}:'.format('SpectrumType') + ' {SpectrumType:s}']
output += ['Spectral Parameters']
for k, v in self['params'].items():
if isinstance(v, np.ndarray):
output += [
'{:15s}: {:10.4g} +/- {:10.4g}'.format(k, v[0], v[1])]
return '\n'.join(output).format(**data)
[docs] def items(self):
return self._data.items()
@property
def data(self):
return self._data
@property
def params(self):
return self._data['params']
@property
def spectral_pars(self):
return self._data['spectral_pars']
@property
def spatial_pars(self):
return self._data['spatial_pars']
@property
def name(self):
return self._data['name']
@property
def names(self):
return self._names
@property
def assoc(self):
return self._data['assoc']
@staticmethod
[docs] def create_from_dict(src_dict, roi_skydir=None):
src_dict.setdefault('SpatialModel','PointSource')
src_dict.setdefault('SpatialType',
gtutils.get_spatial_type(src_dict['SpatialModel']))
# Need this to handle old conventions for
# MapCubeFunction/ConstantValue sources
if src_dict['SpatialModel'] == 'DiffuseSource':
src_dict['SpatialModel'] = src_dict['SpatialType']
if 'filefunction' in src_dict:
src_dict['Spectrum_Filename'] = src_dict.pop('filefunction')
if 'mapcube' in src_dict:
src_dict['Spatial_Filename'] = src_dict.pop('mapcube')
if 'spectral_pars' in src_dict:
src_dict['spectral_pars'] = gtutils.cast_pars_dict(src_dict['spectral_pars'])
if 'spatial_pars' in src_dict:
src_dict['spatial_pars'] = gtutils.cast_pars_dict(src_dict['spatial_pars'])
if src_dict['SpatialModel'] == 'ConstantValue':
return IsoSource(src_dict['name'],src_dict)
elif src_dict['SpatialModel'] == 'MapCubeFunction':
return MapCubeSource(src_dict['name'],src_dict)
else:
return Source.create_from_dict(src_dict,roi_skydir)
def _sync_spectral_pars(self):
"""Update spectral parameters dictionary."""
sp = self['spectral_pars']
for k, p in sp.items():
sp[k]['value'] = self['params'][k][0]/sp[k]['scale']
if np.isfinite(self['params'][k][1]):
sp[k]['error'] = self['params'][k][1]/np.abs(sp[k]['scale'])
sp[k] = gtutils.make_parameter_dict(sp[k])
def _sync_params(self):
self._data['params'] = get_params_dict(self['spectral_pars'])
[docs] def get_norm(self):
par_name = gtutils.get_function_norm_par_name(self['SpectrumType'])
val = self.spectral_pars[par_name]['value']
scale = self.spectral_pars[par_name]['scale']
return float(val)*float(scale)
[docs] def get_catalog_dict(self):
o = {'Spectral_Index' : np.nan,
'Flux_Density' : np.nan,
'Pivot_Energy' : np.nan,
'beta' : np.nan,
'Exp_Index' : np.nan,
'Cutoff' : np.nan}
if self['SpectrumType'] == 'PowerLaw':
o['Spectral_Index'] = -1.0*self.params['Index'][0]
o['Flux_Density'] = self.params['Prefactor'][0]
o['Pivot_Energy'] = self.params['Scale'][0]
elif self['SpectrumType'] == 'LogParabola':
o['Spectral_Index'] = self.params['alpha'][0]
o['Flux_Density'] = self.params['norm'][0]
o['Pivot_Energy'] = self.params['Eb'][0]
o['beta'] = self.params['beta'][0]
elif self['SpectrumType'] == 'PLSuperExpCutoff':
o['Spectral_Index'] = -self.params['Index1'][0]
o['Exp_Index'] = self.params['Index2'][0]
o['Flux_Density'] = self.params['Prefactor'][0]
o['Pivot_Energy'] = self.params['Scale'][0]
o['Cutoff'] = self.params['Cutoff'][0]
return o
[docs] def check_cuts(self, cuts):
if cuts is None:
return True
if isinstance(cuts, tuple):
cuts = {cuts[0]: (cuts[1], cuts[2])}
elif isinstance(cuts, list):
cuts = {c[0]: (c[1], c[2]) for c in cuts}
for k, v in cuts.items():
# if not isinstance(c,tuple) or len(c) != 3:
# raise Exception('Wrong format for cuts tuple.')
if k in self._data:
if not utils.apply_minmax_selection(self[k], v):
return False
elif 'catalog' in self._data and k in self._data['catalog']:
if not utils.apply_minmax_selection(self['catalog'][k], v):
return False
else:
return False
return True
[docs] def set_spectral_pars(self,spectral_pars):
self._data['spectral_pars'] = copy.deepcopy(spectral_pars)
self._sync_params()
[docs] def update_spectral_pars(self,spectral_pars):
self._data['spectral_pars'] = utils.merge_dict(self.spectral_pars,spectral_pars)
self._sync_params()
[docs] def set_name(self, name, names=None):
self._data['name'] = name
if names is None:
self._names = [name]
else:
self._names = names
[docs] def add_name(self, name):
if name not in self._names:
self._names.append(name)
[docs] def update_data(self, d):
self._data = utils.merge_dict(self._data, d, add_new_keys=True)
#if self.params:
# self._sync_spectral_pars()
[docs] def update_from_source(self, src):
self._data['spectral_pars'] = {}
self._data['spatial_pars'] = {}
self._data = utils.merge_dict(self.data, src.data, add_new_keys=True)
self._name = src.name
self._names = list(set(self._names + src.names))
[docs]class IsoSource(Model):
def __init__(self, name, data):
data['SpectrumType'] = 'FileFunction'
data['SpatialType'] = 'ConstantValue'
data['SpatialModel'] = 'ConstantValue'
data['SourceType'] = 'DiffuseSource'
if not 'spectral_pars' in data:
data['spectral_pars'] = {
'Normalization': {'name': 'Normalization', 'scale': 1.0,
'value': 1.0,
'min': 0.001, 'max': 1000.0,
'free': False }}
super(IsoSource, self).__init__(name, data)
self._init_spatial_pars()
@property
def filefunction(self):
return self._data['Spectrum_Filename']
@property
def diffuse(self):
return True
def _init_spatial_pars(self):
self['spatial_pars'] = {
'Value': {'name': 'Value', 'scale': '1',
'value': '1', 'min': '0', 'max': '10',
'free': '0'}}
[docs] def write_xml(self, root):
source_element = utils.create_xml_element(root, 'source',
dict(name=self.name,
type='DiffuseSource'))
filename = utils.path_to_xmlpath(self.filefunction)
spec_el = utils.create_xml_element(source_element, 'spectrum',
dict(file=filename,
type='FileFunction',
ctype='-1'))
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type='ConstantValue'))
for k, v in self.spectral_pars.items():
utils.create_xml_element(spec_el, 'parameter', v)
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
[docs]class MapCubeSource(Model):
def __init__(self, name, data):
data['SpectrumType'] = 'PowerLaw'
data['SpatialType'] = 'MapCubeFunction'
data['SpatialModel'] = 'MapCubeFunction'
data['SourceType'] = 'DiffuseSource'
if not 'spectral_pars' in data:
data['spectral_pars'] = {
'Prefactor': {'name': 'Prefactor', 'scale': 1.0,
'value': 1.0, 'min': 0.1, 'max': '10.0',
'free': False},
'Index': {'name': 'Index', 'scale': -1.0,
'value': 0.0, 'min': -1.0, 'max': 1.0,
'free': False},
'Scale': {'name': 'Scale', 'scale': 1.0,
'value': 1000.0,
'min': 1000.0, 'max': 1000.0,
'free': False},
}
super(MapCubeSource, self).__init__(name, data)
self._init_spatial_pars()
@property
def mapcube(self):
return self._data['Spatial_Filename']
@property
def diffuse(self):
return True
def _init_spatial_pars(self):
self['spatial_pars'] = {
'Normalization':
{'name': 'Normalization', 'scale': '1',
'value': '1', 'min': '0', 'max': '10',
'free': '0'}}
[docs] def write_xml(self, root):
source_element = utils.create_xml_element(root, 'source',
dict(name=self.name,
type='DiffuseSource'))
spec_el = utils.create_xml_element(source_element, 'spectrum',
dict(type='PowerLaw'))
filename = utils.path_to_xmlpath(self.mapcube)
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type='MapCubeFunction',
file=filename))
for k, v in self.spectral_pars.items():
utils.create_xml_element(spec_el, 'parameter', v)
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
[docs]class Source(Model):
"""Class representation of a source (non-diffuse) model component.
A source object serves as a container for the properties of that
source (position, spatial/spectral parameters, TS, etc.) as
derived in the current analysis. Most properties of a source
object can be accessed with the bracket operator:
# Return the TS of this source
>>> print src['ts']
# Get a skycoord representation of the source position
>>> print src.skydir
"""
def __init__(self, name, data, radec=None):
data.setdefault('SpatialModel','PointSource')
data.setdefault('SpectrumType','PowerLaw')
data.setdefault('SpatialType',gtutils.get_spatial_type(data['SpatialModel']))
data.setdefault('SourceType',gtutils.get_source_type(data['SpatialType']))
super(Source, self).__init__(name, data)
catalog = self.data.get('catalog', {})
if radec is not None:
self._set_radec(radec)
elif 'ra' in self.data and 'dec' in self.data:
self._set_radec([self.data['ra'], self.data['dec']])
elif 'RAJ2000' in catalog and 'DEJ2000' in catalog:
self._set_radec([catalog['RAJ2000'], catalog['DEJ2000']])
else:
raise Exception('Failed to infer RADEC for source: %s' % name)
self._init_spatial_pars()
def __str__(self):
data = copy.deepcopy(self.data)
data['names'] = self.names
try:
data['flux'], data['flux_err'] = data['flux'][0], data['flux'][1]
except:
data['flux'], data['flux_err'] = 0., 0.
try:
data['eflux'], data['eflux_err'] = data['eflux'][0], data['eflux'][1]
except:
data['eflux'], data['eflux_err'] = 0., 0.
output = []
output += ['{:15s}:'.format('Name') + ' {name:s}']
output += ['{:15s}:'.format('Associations') + ' {names:s}']
output += ['{:15s}:'.format('RA/DEC') + ' {ra:10.3f}/{dec:10.3f}']
output += ['{:15s}:'.format('GLON/GLAT') + ' {glon:10.3f}/{glat:10.3f}']
output += ['{:15s}:'.format('TS') + ' {ts:.2f}']
output += ['{:15s}:'.format('Npred') + ' {npred:.2f}']
output += ['{:15s}:'.format('Flux') + ' {flux:9.4g} +/- {flux_err:8.3g}']
output += ['{:15s}:'.format('EnergyFlux') + ' {eflux:9.4g} +/- {eflux_err:8.3g}']
output += ['{:15s}:'.format('SpatialModel') + ' {SpatialModel:s}']
output += ['{:15s}:'.format('SpectrumType') + ' {SpectrumType:s}']
output += ['Spectral Parameters']
for k, v in self['params'].items():
if isinstance(v, np.ndarray):
output += [
'{:15s}: {:10.4g} +/- {:10.4g}'.format(k, v[0], v[1])]
return '\n'.join(output).format(**data)
def _set_radec(self,radec):
self['radec'] = np.array(radec,ndmin=1)
self['RAJ2000'] = radec[0]
self['DEJ2000'] = radec[1]
self['ra'] = radec[0]
self['dec'] = radec[1]
glonlat = utils.eq2gal(radec[0], radec[1])
self['glon'], self['glat'] = glonlat[0][0], glonlat[1][0]
if 'RA' in self.spatial_pars:
self.spatial_pars['RA']['value'] = radec[0]
self.spatial_pars['DEC']['value'] = radec[1]
def _set_spatial_width(self):
if self['SpatialModel'] in ['GaussianSource','RadialGaussian']:
if self['SpatialWidth'] is None:
self.data.setdefault('Sigma',0.5)
self['SpatialWidth'] = self['Sigma']*1.5095921854516636
else:
self.data.setdefault('Sigma',self['SpatialWidth']/1.5095921854516636)
elif self['SpatialModel'] in ['DiskSource','RadialDisk']:
if self['SpatialWidth'] is None:
self.data.setdefault('Radius',0.5)
self['SpatialWidth'] = self['Radius']*0.8246211251235321
else:
self.data.setdefault('Radius',self['SpatialWidth']/0.8246211251235321)
def _init_spatial_pars(self):
if self['SpatialType'] == 'SkyDirFunction':
self._extended = False
self._data['SourceType'] = 'PointSource'
else:
self._extended = True
self._data['SourceType'] = 'DiffuseSource'
self._set_spatial_width()
if self['SpatialType'] == 'SpatialMap':
self._data['spatial_pars'] = {
'Prefactor': {'name': 'Prefactor', 'value': 1.0,
'free': False, 'min': 0.001, 'max': 1000.0,
'scale': 1.0}
}
else:
self.spatial_pars.setdefault('RA',
{'name': 'RA', 'value': self['ra'],
'free': False,
'min': -360.0, 'max': 360.0, 'scale': 1.0})
self.spatial_pars.setdefault('DEC',
{'name': 'DEC', 'value': self['dec'],
'free': False,
'min': -90.0, 'max': 90.0, 'scale': 1.0})
if self['SpatialType'] == 'RadialGaussian':
self.spatial_pars.setdefault('Sigma',
{'name': 'Sigma', 'value': self['Sigma'],
'free': False, 'min': 0.001, 'max': 10,
'scale': '1.0'})
elif self['SpatialType'] == 'RadialDisk':
self.spatial_pars.setdefault('Radius',
{'name': 'Radius', 'value': self['Radius'],
'free': False, 'min': 0.001, 'max': 10,
'scale': 1.0})
[docs] def load_from_catalog(self):
"""Load spectral parameters from catalog values."""
self._data['spectral_pars'] = \
gtutils.get_function_defaults(self['SpectrumType'])
sp = self['spectral_pars']
catalog = self.data.get('catalog', {})
if self['SpectrumType'] == 'PowerLaw':
sp['Prefactor']['value'] = catalog['Flux_Density']
sp['Prefactor']['scale'] = None
sp['Scale']['value'] = catalog['Pivot_Energy']
sp['Scale']['scale'] = 1.0
sp['Index']['value'] = catalog['Spectral_Index']
sp['Index']['max'] = max(5.0, sp['Index']['value'] + 1.0)
sp['Index']['min'] = min(0.0, sp['Index']['value'] - 1.0)
sp['Index']['scale'] = -1.0
sp['Prefactor'] = gtutils.make_parameter_dict(sp['Prefactor'])
sp['Scale'] = gtutils.make_parameter_dict(sp['Scale'], True)
sp['Index'] = gtutils.make_parameter_dict(sp['Index'])
elif self['SpectrumType'] == 'LogParabola':
sp['norm']['value'] = catalog['Flux_Density']
sp['norm']['scale'] = None
sp['Eb']['value'] = catalog['Pivot_Energy']
sp['alpha']['value'] = catalog['Spectral_Index']
sp['beta']['value'] = catalog['beta']
sp['norm'] = gtutils.make_parameter_dict(sp['norm'])
sp['Eb'] = gtutils.make_parameter_dict(sp['Eb'], True)
sp['alpha'] = gtutils.make_parameter_dict(sp['alpha'])
sp['beta'] = gtutils.make_parameter_dict(sp['beta'])
elif self['SpectrumType'] == 'PLSuperExpCutoff':
flux_density = catalog['Flux_Density']
flux_density *= np.exp(
(catalog['Pivot_Energy'] / catalog['Cutoff']) ** catalog[
'Exp_Index'])
sp['Prefactor']['value'] = flux_density
sp['Prefactor']['scale'] = None
sp['Index1']['value'] = catalog['Spectral_Index']
sp['Index1']['scale'] = -1.0
sp['Index2']['value'] = catalog['Exp_Index']
sp['Index2']['scale'] = 1.0
sp['Scale']['value'] = catalog['Pivot_Energy']
sp['Cutoff']['value'] = catalog['Cutoff']
sp['Prefactor'] = gtutils.make_parameter_dict(sp['Prefactor'])
sp['Scale'] = gtutils.make_parameter_dict(sp['Scale'], True)
sp['Index1'] = gtutils.make_parameter_dict(sp['Index1'])
sp['Index2'] = gtutils.make_parameter_dict(sp['Index2'])
sp['Cutoff'] = gtutils.make_parameter_dict(sp['Cutoff'])
else:
raise Exception('Unsupported spectral type:' + self['SpectrumType'])
[docs] def update_data(self, d):
self._data = utils.merge_dict(self._data, d, add_new_keys=True)
if 'ra' in d and 'dec' in d:
self._set_radec([d['ra'],d['dec']])
#if self.params:
# self._sync_spectral_pars()
[docs] def set_position(self, skydir):
"""
Set the position of the source.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
"""
if not isinstance(skydir, SkyCoord):
skydir = SkyCoord(ra=skydir[0], dec=skydir[1], unit=u.deg)
if not skydir.isscalar:
skydir = np.ravel(skydir)[0]
radec = np.array([skydir.icrs.ra.deg, skydir.icrs.dec.deg])
self._set_radec(radec)
[docs] def set_roi_direction(self,roidir):
offset = roidir.separation(self.skydir).deg
offset_cel = wcs_utils.sky_to_offset(roidir,self['ra'], self['dec'], 'CEL')
offset_gal = wcs_utils.sky_to_offset(roidir,self['glon'], self['glat'], 'GAL')
self['offset'] = offset
self['offset_ra'] = offset_cel[0, 0]
self['offset_dec'] = offset_cel[0, 1]
self['offset_glon'] = offset_gal[0, 0]
self['offset_glat'] = offset_gal[0, 1]
[docs] def set_roi_projection(self,proj):
if proj is None:
return
self['offset_roi_edge'] = proj.distance_to_edge(self.skydir)
[docs] def set_spatial_model(self, spatial_model, spatial_width=None):
self._data['SpatialModel'] = spatial_model
self._data['SpatialWidth'] = spatial_width
self._data['SpatialType'] = gtutils.get_spatial_type(self['SpatialModel'])
self._data['spatial_pars'] = {}
self._init_spatial_pars()
[docs] def separation(self, src):
if isinstance(src, Source):
return self.radec.separation(src.skydir)
else:
return self.radec.separation(src)
@property
def diffuse(self):
return False
@property
def extended(self):
return self._extended
@property
def associations(self):
return self._names
@property
def radec(self):
return self['radec']
@property
def skydir(self):
"""Return a SkyCoord representation of the source position.
Returns
-------
skydir : `~astropy.coordinates.SkyCoord`
"""
return SkyCoord(self.radec[0] * u.deg, self.radec[1] * u.deg)
@property
def data(self):
return self._data
@staticmethod
[docs] def create_from_dict(src_dict, roi_skydir=None):
"""Create a source object from a python dictionary.
Parameters
----------
src_dict : dict
Dictionary defining the properties of the source.
"""
src_dict = copy.deepcopy(src_dict)
src_dict.setdefault('SpatialModel','PointSource')
src_dict.setdefault('Spectrum_Filename',None)
spectrum_type = src_dict.setdefault('SpectrumType','PowerLaw')
spatial_type = \
src_dict.setdefault('SpatialType',
gtutils.get_spatial_type(src_dict['SpatialModel']))
spectral_pars = \
src_dict.setdefault('spectral_pars',
gtutils.get_function_defaults(spectrum_type))
spatial_pars = \
src_dict.setdefault('spatial_pars',
gtutils.get_function_defaults(src_dict['SpatialType']))
if 'file' in src_dict:
src_dict['Spectrum_Filename'] = src_dict.pop('file')
if spectrum_type == 'DMFitFunction' and src_dict['Spectrum_Filename'] is None:
src_dict['Spectrum_Filename'] = os.path.join('$FERMIPY_DATA_DIR',
'gammamc_dif.dat')
for k in ['RA','DEC','Prefactor']:
if k in spatial_pars:
del spatial_pars[k]
for k, v in spectral_pars.items():
if k not in src_dict:
continue
if not isinstance(src_dict[k], dict):
spectral_pars[k].update({'name': k,
'value': src_dict.pop(k)})
else:
spectral_pars[k].update(src_dict.pop(k))
for k, v in spatial_pars.items():
if k not in src_dict:
continue
if not isinstance(src_dict[k], dict):
spatial_pars[k].update({'name': k, 'value': src_dict[k]})
else:
spatial_pars[k].update(src_dict.pop(k))
for k, v in spectral_pars.items():
spectral_pars[k] = gtutils.make_parameter_dict(spectral_pars[k])
for k, v in spatial_pars.items():
spatial_pars[k] = gtutils.make_parameter_dict(spatial_pars[k],
rescale=False)
src_dict['spectral_pars'] = gtutils.cast_pars_dict(spectral_pars)
src_dict['spatial_pars'] = gtutils.cast_pars_dict(spatial_pars)
# validate_config(src_dict,default_src_dict)
if 'name' in src_dict:
name = src_dict['name']
src_dict['Source_Name'] = src_dict.pop('name')
elif 'Source_Name' in src_dict:
name = src_dict['Source_Name']
else:
raise Exception('Source name undefined.')
skydir = wcs_utils.get_target_skydir(src_dict, roi_skydir)
src_dict['RAJ2000'] = skydir.ra.deg
src_dict['DEJ2000'] = skydir.dec.deg
radec = np.array([skydir.ra.deg, skydir.dec.deg])
return Source(name, src_dict, radec=radec)
@staticmethod
[docs] def create_from_xml(root, extdir=None):
"""Create a Source object from an XML node."""
spec = utils.load_xml_elements(root, 'spectrum')
spat = utils.load_xml_elements(root, 'spatialModel')
spectral_pars = utils.load_xml_elements(root, 'spectrum/parameter')
spatial_pars = utils.load_xml_elements(root, 'spatialModel/parameter')
spectral_pars = gtutils.cast_pars_dict(spectral_pars)
spatial_pars = gtutils.cast_pars_dict(spatial_pars)
src_type = root.attrib['type']
spatial_type = spat['type']
spectral_type = spec['type']
xml_dict = copy.deepcopy(root.attrib)
src_dict = {'catalog': xml_dict}
src_dict['Source_Name'] = xml_dict['name']
src_dict['SpectrumType'] = spec['type']
src_dict['SpatialType'] = spatial_type
src_dict['SourceType'] = src_type
src_dict['Spatial_Filename'] = None
src_dict['Spectrum_Filename'] = None
if 'file' in spat:
src_dict['Spatial_Filename'] = utils.xmlpath_to_path(spat['file'])
if not os.path.isfile(src_dict['Spatial_Filename']) \
and extdir is not None:
src_dict['Spatial_Filename'] = \
os.path.join(extdir, 'Templates',
src_dict['Spatial_Filename'])
if 'file' in spec:
src_dict['Spectrum_Filename'] = utils.xmlpath_to_path(spec['file'])
if src_type == 'PointSource':
src_dict['SpatialModel'] = 'PointSource'
elif spatial_type == 'SpatialMap':
src_dict['SpatialModel'] = 'SpatialMap'
else:
src_dict['SpatialModel'] = spatial_type
if src_type == 'PointSource' or \
spatial_type in ['SpatialMap','RadialGaussian','RadialDisk']:
if 'RA' in src_dict:
src_dict['RAJ2000'] = float(xml_dict['RA'])
src_dict['DEJ2000'] = float(xml_dict['DEC'])
elif 'RA' in spatial_pars:
src_dict['RAJ2000'] = float(spatial_pars['RA']['value'])
src_dict['DEJ2000'] = float(spatial_pars['DEC']['value'])
else:
hdu = pyfits.open(
os.path.expandvars(src_dict['Spatial_Filename']))
src_dict['RAJ2000'] = float(hdu[0].header['CRVAL1'])
src_dict['DEJ2000'] = float(hdu[0].header['CRVAL2'])
radec = np.array([src_dict['RAJ2000'], src_dict['DEJ2000']])
src_dict['spectral_pars'] = spectral_pars
src_dict['spatial_pars'] = spatial_pars
return Source(src_dict['Source_Name'],
src_dict, radec=radec)
elif src_type == 'DiffuseSource' and spatial_type == 'ConstantValue':
return IsoSource(src_dict['Source_Name'],
{'Spectrum_Filename' : spec['file'],
'spectral_pars' : spectral_pars,
'spatial_pars' : spatial_pars})
elif src_type == 'DiffuseSource' and spatial_type == 'MapCubeFunction':
return MapCubeSource(src_dict['Source_Name'],
{'Spatial_Filename' : spat['file'],
'spectral_pars' : spectral_pars,
'spatial_pars' : spatial_pars})
else:
raise Exception(
'Unrecognized type for source: %s' % src_dict['Source_Name'])
[docs] def write_xml(self, root):
"""Write this source to an XML node."""
if not self.extended:
source_element = utils.create_xml_element(root, 'source',
dict(name=self[
'Source_Name'],
type='PointSource'))
spat_el = ElementTree.SubElement(source_element, 'spatialModel')
spat_el.set('type', 'SkyDirFunction')
elif self['SpatialType'] == 'SpatialMap':
source_element = utils.create_xml_element(root, 'source',
dict(name=self[
'Source_Name'],
type='DiffuseSource'))
filename = utils.path_to_xmlpath(self['Spatial_Filename'])
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(map_based_integral='True',
type='SpatialMap',
file=filename))
else:
source_element = utils.create_xml_element(root, 'source',
dict(name=self['Source_Name'],
type='DiffuseSource'))
spat_el = utils.create_xml_element(source_element, 'spatialModel',
dict(type=self['SpatialType']))
for k, v in self.spatial_pars.items():
utils.create_xml_element(spat_el, 'parameter', v)
el = ElementTree.SubElement(source_element, 'spectrum')
stype = self['SpectrumType'].strip()
el.set('type', stype)
if self['Spectrum_Filename'] is not None:
filename = utils.path_to_xmlpath(self['Spectrum_Filename'])
el.set('file',filename)
for k, v in self.spectral_pars.items():
utils.create_xml_element(el, 'parameter', v)
[docs]class ROIModel(fermipy.config.Configurable):
"""This class is responsible for managing the ROI model (both sources
and diffuse components). Source catalogs can be read
from either FITS or XML files. Individual components are
represented by instances of `~fermipy.roi_model.Model` and can be
accessed by name using the bracket operator.
* Create an ROI with all 3FGL sources and print a summary of its contents:
>>> skydir = astropy.coordinates.SkyCoord(0.0,0.0,unit='deg')
>>> roi = ROIModel({'catalogs' : ['3FGL'],'src_roiwidth' : 10.0},skydir=skydir)
>>> print roi
name SpatialModel SpectrumType offset ts npred
--------------------------------------------------------------------------------
3FGL J2357.3-0150 PointSource PowerLaw 1.956 nan 0.0
3FGL J0006.2+0135 PointSource PowerLaw 2.232 nan 0.0
3FGL J0016.3-0013 PointSource PowerLaw 4.084 nan 0.0
3FGL J0014.3-0455 PointSource PowerLaw 6.085 nan 0.0
* Print a summary of an individual source
>>> print roi['3FGL J0006.2+0135']
* Get the SkyCoord for a source
>>> dir = roi['SourceA'].skydir
* Loop over all sources and print their names
>>> for s in roi.sources: print s.name
"""
defaults = dict(defaults.model.items(),
logfile=(None, '', str),
fileio=defaults.fileio,
logging=defaults.logging)
src_name_cols = ['Source_Name',
'ASSOC', 'ASSOC1', 'ASSOC2', 'ASSOC_GAM',
'1FHL_Name', '2FGL_Name', '3FGL_Name',
'ASSOC_GAM1', 'ASSOC_GAM2', 'ASSOC_TEV']
def __init__(self, config=None, **kwargs):
# Coordinate for ROI center (defaults to 0,0)
self._skydir = kwargs.pop('skydir', SkyCoord(0.0, 0.0, unit=u.deg))
self._projection = kwargs.get('projection',None)
coordsys = kwargs.pop('coordsys', 'CEL')
super(ROIModel, self).__init__(config, **kwargs)
self.logger = Logger.get(self.__class__.__name__,
self.config['logfile'],
log_level(self.config['logging']['verbosity']))
if self.config['extdir'] is not None and \
not os.path.isdir(os.path.expandvars(self.config['extdir'])):
self._config['extdir'] = \
os.path.join('$FERMIPY_DATA_DIR',
'catalogs', self.config['extdir'])
self._src_radius = self.config['src_radius']
if self.config['src_roiwidth'] is not None:
self._config['src_radius_roi'] = self.config['src_roiwidth'] * 0.5
self._srcs = []
self._diffuse_srcs = []
self._src_dict = collections.defaultdict(set)
self._src_radius = []
self.load(coordsys=coordsys)
def __contains__(self, key):
key = key.replace(' ', '').lower()
return key in self._src_dict.keys()
def __getitem__(self, key):
return self.get_source_by_name(key)
def __iter__(self):
return iter(self._srcs + self._diffuse_srcs)
def __str__(self):
o = ''
o += '%-20s%-15s%-15s%8s%10s%12s\n' % (
'name', 'SpatialModel', 'SpectrumType', 'offset',
'ts', 'npred')
o += '-' * 80 + '\n'
for s in sorted(self.sources, key=lambda t: t['offset']):
if s.diffuse: continue
o += '%-20.19s%-15.14s%-15.14s%8.3f%10.2f%12.1f\n' % (
s['name'], s['SpatialModel'],
s['SpectrumType'],
s['offset'], s['ts'], s['npred'])
for s in sorted(self.sources, key=lambda t: t['offset']):
if not s.diffuse: continue
o += '%-20.19s%-15.14s%-15.14s%8s%10.2f%12.1f\n' % (
s['name'], s['SpatialModel'],
s['SpectrumType'],
'-----', s['ts'], s['npred'])
return o
@property
def skydir(self):
"""Return the sky direction corresponding to the center of the
ROI."""
return self._skydir
@property
def projection(self):
return self._projection
@property
def sources(self):
return self._srcs + self._diffuse_srcs
@property
def point_sources(self):
return self._srcs
@property
def diffuse_sources(self):
return self._diffuse_srcs
[docs] def set_projection(self,proj):
self._projection = proj
for s in self._srcs:
s.set_roi_projection(proj)
[docs] def clear(self):
"""Clear the contents of the ROI."""
self._srcs = []
self._diffuse_srcs = []
self._src_dict = collections.defaultdict(set)
self._src_radius = []
[docs] def load_diffuse_srcs(self):
self._load_diffuse_src('isodiff')
self._load_diffuse_src('galdiff')
self._load_diffuse_src('limbdiff')
self._load_diffuse_src('diffuse')
def _load_diffuse_src(self, name, src_type='FileFunction'):
if 'FERMI_DIR' in os.environ and 'FERMI_DIFFUSE_DIR' not in os.environ:
os.environ['FERMI_DIFFUSE_DIR'] = \
os.path.expandvars('$FERMI_DIR/refdata/fermi/galdiffuse')
if 'FERMIPY_WORKDIR' not in os.environ:
if self.config['fileio']['workdir'] is not None:
os.environ['FERMIPY_WORKDIR'] = self.config['fileio']['workdir']
else:
os.environ['FERMIPY_WORKDIR'] = os.getcwd()
srcs = []
if self.config[name] is not None:
srcs = self.config[name]
for i, t in enumerate(srcs):
if utils.isstr(t):
src_dict = {'file': t}
elif isinstance(t, dict):
src_dict = copy.deepcopy(t)
else:
raise Exception('Invalid type in diffuse mode list: %s'%str(type(t)))
src_dict['file'] = \
resolve_file_path(src_dict['file'],
search_dirs=['$FERMIPY_WORKDIR',
os.path.join('$FERMIPY_ROOT',
'data'),
'$FERMI_DIFFUSE_DIR'])
if 'name' not in src_dict:
if len(srcs) == 1:
src_dict['name'] = name
else:
src_dict['name'] = name + '%02i' % i
if re.search(r'(\.txt$)', src_dict['file']):
src_type = 'FileFunction'
elif re.search(r'(\.fits$|\.fit$|\.fits.gz$|\.fit.gz$)',
src_dict['file']):
src_type = 'MapCubeFunction'
else:
raise Exception(
'Unrecognized file format for diffuse model: %s' % src_dict[
'file'])
# Extract here
if src_type == 'FileFunction':
src = IsoSource(src_dict['name'], {'Spectrum_Filename' : src_dict['file']})
altname = os.path.basename(src_dict['file'])
altname = re.sub(r'(\.txt$)', '', altname)
else:
src = MapCubeSource(src_dict['name'], {'Spatial_Filename' : src_dict['file']})
altname = os.path.basename(src_dict['file'])
altname = re.sub(r'(\.fits$|\.fit$|\.fits.gz$|\.fit.gz$)',
'', altname)
src.add_name(altname)
self.load_source(src, False, self.config['merge_sources'])
[docs] def create_source(self, name, src_dict, build_index=True,
merge_sources=True):
"""Add a new source to the ROI model from a dictionary or an
existing source object.
Parameters
----------
name : str
src_dict : dict or `~fermipy.roi_model.Source`
Returns
-------
src : `~fermipy.roi_model.Source`
"""
src_dict = copy.deepcopy(src_dict)
if isinstance(src_dict,dict):
src_dict['name'] = name
src = Model.create_from_dict(src_dict,self.skydir)
else:
src = src_dict
if isinstance(src,Source):
src.set_roi_direction(self.skydir)
src.set_roi_projection(self.projection)
self.logger.debug('Creating source ' + src.name)
self.load_source(src, build_index=build_index,
merge_sources=merge_sources)
return self.get_source_by_name(name)
[docs] def copy_source(self, name):
src = self.get_source_by_name(name)
return copy.deepcopy(src)
[docs] def load_sources(self, sources):
"""Delete all sources in the ROI and load the input source list."""
self.logger.debug('Loading sources')
self.clear()
for s in sources:
if isinstance(s, dict):
s = Model.create_from_dict(s)
self.load_source(s, build_index=False)
self._build_src_index()
self.logger.debug('Finished')
[docs] def load_source(self, src, build_index=True, merge_sources=True,
**kwargs):
"""
Load a single source.
Parameters
----------
src : `~fermipy.roi_model.Source`
Source object that will be added to the ROI.
merge_sources : bool
When a source matches an existing source in the model
update that source with the properties of the new source.
build_index : bool
Re-make the source index after loading this source.
"""
src = copy.deepcopy(src)
name = src.name.replace(' ', '').lower()
min_sep = kwargs.get('min_separation',None)
if min_sep is not None:
sep = src.skydir.separation(self._src_skydir).deg
if len(sep) > 0 and np.min(sep) < min_sep:
return
match_srcs = self.match_source(src)
if len(match_srcs) == 1:
self.logger.debug('Found matching source for %s : %s'
%( src.name, match_srcs[0].name ) )
if merge_sources:
self.logger.debug('Updating source model for %s' % src.name)
match_srcs[0].update_from_source(src)
else:
match_srcs[0].add_name(src.name)
self.logger.debug('Skipping source model for %s' % src.name)
self._src_dict[src.name.replace(' ', '').lower()].add(match_srcs[0])
return
elif len(match_srcs) > 2:
self.logger.warning('Multiple sources matching %s' % name)
return
self._src_dict[src.name].add(src)
for name in src.names:
self._src_dict[name.replace(' ', '').lower()].add(src)
if isinstance(src, Source):
self._srcs.append(src)
else:
self._diffuse_srcs.append(src)
if build_index:
self._build_src_index()
[docs] def match_source(self,src):
"""Look for source or sources in the model that match the
given source. Sources are matched by name and any association
columns defined in the assoc_xmatch_columns parameter.
"""
srcs = set()
names = [src.name]
for col in self.config['assoc_xmatch_columns']:
if col in src.assoc and src.assoc[col]:
names += [src.assoc[col]]
for name in names:
name = name.replace(' ', '').lower()
if name in self._src_dict and self._src_dict[name]:
srcs.update(self._src_dict[name])
return list(srcs)
[docs] def load(self, **kwargs):
"""Load both point source and diffuse components."""
self.logger.debug('Starting')
coordsys = kwargs.get('coordsys', 'CEL')
extdir = kwargs.get('extdir', self.config['extdir'])
self.clear()
self.load_diffuse_srcs()
for c in self.config['catalogs']:
extname = os.path.splitext(c)[1]
if extname != '.xml':
self.load_fits_catalog(c, extdir=extdir, coordsys=coordsys)
elif extname == '.xml':
self.load_xml(c, extdir=extdir, coordsys=coordsys)
else:
raise Exception('Unrecognized catalog file extension: %s' % c)
for c in self.config['sources']:
if 'name' not in c:
raise Exception('No name field in source dictionary:\n ' + str(c))
self.create_source(c['name'],c, build_index=False)
self._build_src_index()
[docs] def delete_sources(self, srcs):
for k, v in self._src_dict.items():
for s in srcs:
if s in v:
self._src_dict[k].remove(s)
if not v:
del self._src_dict[k]
self._srcs = [s for s in self._srcs if s not in srcs]
self._diffuse_srcs = [s for s in self._diffuse_srcs if s not in srcs]
self._build_src_index()
@staticmethod
[docs] def create_from_roi_data(datafile):
"""Create an ROI model."""
data = np.load(datafile).flat[0]
roi = ROIModel()
roi.load_sources(data['sources'].values())
return roi
@staticmethod
[docs] def create(selection, config, **kwargs):
"""Create an ROIModel instance."""
if selection['target'] is not None:
return ROIModel.create_from_source(selection['target'],
config, **kwargs)
else:
target_skydir = wcs_utils.get_target_skydir(selection)
return ROIModel.create_from_position(target_skydir,
config, **kwargs)
@staticmethod
[docs] def create_from_position(skydir, config, **kwargs):
"""Create an ROIModel instance centered on a sky direction.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
Sky direction on which the ROI will be centered.
config : dict
Model configuration dictionary.
"""
coordsys = kwargs.pop('coordsys', 'CEL')
roi = ROIModel(config, skydir=skydir, coordsys=coordsys, **kwargs)
return roi
srcs_dict = {}
if roi.config['src_radius'] is not None:
rsrc, srcs = roi.get_sources_by_position(skydir,
roi.config['src_radius'])
for s, r in zip(srcs, rsrc):
srcs_dict[s.name] = (s, r)
if roi.config['src_roiwidth'] is not None:
rsrc, srcs = \
roi.get_sources_by_position(skydir,
roi.config['src_roiwidth'] / 2.,
square=True, coordsys=coordsys)
for s, r in zip(srcs, rsrc):
srcs_dict[s.name] = (s, r)
srcs = []
rsrc = []
for k, v in srcs_dict.items():
srcs.append(v[0])
rsrc.append(v[1])
return ROIModel(config, srcs=srcs,
diffuse_srcs=roi._diffuse_srcs,
skydir=skydir, **kwargs)
@staticmethod
[docs] def create_from_source(name, config, **kwargs):
"""Create an ROI centered on the given source."""
coordsys = kwargs.pop('coordsys', 'CEL')
roi = ROIModel(config, src_radius=None, src_roiwidth=None, **kwargs)
src = roi.get_source_by_name(name)
return ROIModel.create_from_position(src.skydir, config,
coordsys=coordsys, **kwargs)
@staticmethod
[docs] def create_roi_from_ft1(ft1file, config):
"""Create an ROI model by extracting the sources coordinates
form an FT1 file."""
pass
[docs] def has_source(self, name):
index_name = name.replace(' ', '').lower()
if index_name in self._src_dict:
return True
else:
return False
[docs] def get_source_by_name(self, name):
"""Return a single source in the ROI with the given name. The
input name string can match any of the strings in the names
property of the source object. Case and whitespace are
ignored when matching name strings. If no sources are found
or multiple sources then an exception is thrown.
Parameters
----------
name : str
Name string.
Returns
-------
srcs : `~fermipy.roi_model.Model`
A source object.
"""
srcs = self.get_sources_by_name(name)
if len(srcs) == 1:
return srcs[0]
elif len(srcs) == 0:
raise Exception('No source matching name: ' + name)
elif len(srcs) > 1:
raise Exception('Multiple sources matching name: ' + name)
[docs] def get_sources_by_name(self, name):
"""Return a list of sources in the ROI matching the given
name. The input name string can match any of the strings in
the names property of the source object. Case and whitespace
are ignored when matching name strings.
Parameters
----------
name : str
Returns
-------
srcs : list
A list of `~fermipy.roi_model.Model` objects.
"""
index_name = name.replace(' ', '').lower()
if index_name in self._src_dict:
return list(self._src_dict[index_name])
if len(srcs) == 1 and unique:
return srcs[0]
elif not unique:
return srcs
else:
raise Exception('Multiple sources matching name: ' + name)
else:
raise Exception('No source matching name: ' + name)
[docs] def get_nearby_sources(self, name, dist, min_dist=None,
square=False):
src = self.get_source_by_name(name)
return self.get_sources_by_position(src.skydir,
dist, min_dist,
square)
[docs] def get_sources(self, skydir=None, distance=None, cuts=None,
minmax_ts=None, minmax_npred=None, square=False,
exclude_diffuse=False,
coordsys='CEL'):
"""Retrieve list of sources satisfying the given selections.
Returns
-------
srcs : list
List of source objects.
"""
if skydir is None:
skydir = self.skydir
rsrc, srcs = self.get_sources_by_position(skydir,
distance,
square=square,
coordsys=coordsys)
o = []
for s, r in zip(srcs, rsrc):
if not s.check_cuts(cuts):
continue
ts = s['ts']
npred = s['npred']
if not utils.apply_minmax_selection(ts, minmax_ts):
continue
if not utils.apply_minmax_selection(npred, minmax_npred):
continue
o.append(s)
for s in self.diffuse_sources:
if exclude_diffuse:
continue
if not s.check_cuts(cuts):
continue
ts = s['ts']
npred = s['npred']
if not utils.apply_minmax_selection(ts, minmax_ts):
continue
if not utils.apply_minmax_selection(npred, minmax_npred):
continue
o.append(s)
return o
[docs] def get_sources_by_property(self, pname, pmin, pmax=None):
srcs = []
for i, s in enumerate(self._srcs):
if pname not in s:
continue
if pmin is not None and s[pname] < pmin:
continue
if pmax is not None and s[pname] > pmax:
continue
srcs.append(s)
return srcs
[docs] def get_sources_by_position(self, skydir, dist, min_dist=None,
square=False, coordsys='CEL'):
"""Retrieve sources within a certain angular distance of a sky
coordinate. This function supports two types of geometric
selections: circular (square=False) and square (square=True).
The circular selection finds all sources with a given angular
distance of the target position. The square selection finds
sources within an ROI-like region of size R x R where R = 2 x
dist.
Parameters
----------
skydir : `~astropy.coordinates.SkyCoord`
Sky direction with respect to which the selection will be applied.
dist : float
Maximum distance in degrees from the sky coordinate.
square : bool
Choose whether to apply a circular or square selection.
coordsys : str
Coordinate system to use when applying a selection with square=True.
"""
msk = get_skydir_distance_mask(self._src_skydir, skydir, dist,
min_dist=min_dist, square=square,
coordsys=coordsys)
radius = self._src_skydir.separation(skydir).deg
radius = radius[msk]
srcs = [self._srcs[i] for i in np.nonzero(msk)[0]]
isort = np.argsort(radius)
radius = radius[isort]
srcs = [srcs[i] for i in isort]
return radius, srcs
[docs] def load_fits_catalog(self, name, **kwargs):
"""Load sources from a FITS catalog file.
Parameters
----------
name : str
Catalog name or path to a catalog FITS file.
"""
self.logger.debug('Loading FITS catalog: %s'%name)
coordsys = kwargs.get('coordsys', 'CEL')
extdir = kwargs.get('extdir', self.config['extdir'])
cat = catalog.Catalog.create(name)
m0 = get_skydir_distance_mask(cat.skydir, self.skydir,
self.config['src_radius'])
m1 = get_skydir_distance_mask(cat.skydir, self.skydir,
self.config['src_radius_roi'],
square=True, coordsys=coordsys)
m = (m0 & m1)
offset = self.skydir.separation(cat.skydir).deg
offset_cel = wcs_utils.sky_to_offset(self.skydir,
cat.radec[:, 0], cat.radec[:, 1],
'CEL')
offset_gal = wcs_utils.sky_to_offset(self.skydir,
cat.glonlat[:, 0], cat.glonlat[:, 1],
'GAL')
for i, (row, radec) in enumerate(zip(cat.table[m],
cat.radec[m])):
catalog_dict = catalog.row_to_dict(row)
src_dict = {'catalog': catalog_dict}
src_dict['Source_Name'] = row['Source_Name']
src_dict['SpectrumType'] = row['SpectrumType']
if row['extended']:
src_dict['SourceType'] = 'DiffuseSource'
src_dict['SpatialType'] = 'SpatialMap'
src_dict['SpatialModel'] = 'SpatialMap'
search_dirs = []
if extdir is not None:
search_dirs += [extdir, os.path.join(extdir, 'Templates')]
search_dirs += [row['extdir'],
os.path.join(row['extdir'], 'Templates')]
src_dict['Spatial_Filename'] = resolve_file_path(
row['Spatial_Filename'],
search_dirs=search_dirs)
else:
src_dict['SourceType'] = 'PointSource'
src_dict['SpatialType'] = 'SkyDirFunction'
src_dict['SpatialModel'] = 'PointSource'
src = Source(src_dict['Source_Name'], src_dict, radec=radec)
src.load_from_catalog()
src.data['offset'] = offset[m][i]
src.data['offset_ra'] = offset_cel[:, 0][m][i]
src.data['offset_dec'] = offset_cel[:, 1][m][i]
src.data['offset_glon'] = offset_gal[:, 0][m][i]
src.data['offset_glat'] = offset_gal[:, 1][m][i]
self.load_source(src, False,
merge_sources=self.config['merge_sources'])
self._build_src_index()
[docs] def load_xml(self, xmlfile, **kwargs):
"""Load sources from an XML file."""
extdir = kwargs.get('extdir', self.config['extdir'])
coordsys = kwargs.get('coordsys', 'CEL')
if not os.path.isfile(xmlfile):
xmlfile = os.path.join(fermipy.PACKAGE_DATA, 'catalogs', xmlfile)
self.logger.info('Reading XML Model: ' + xmlfile)
root = ElementTree.ElementTree(file=xmlfile).getroot()
diffuse_srcs = []
srcs = []
ra, dec = [], []
for s in root.findall('source'):
src = Source.create_from_xml(s, extdir=extdir)
if src.diffuse:
diffuse_srcs += [src]
else:
srcs += [src]
ra += [src['RAJ2000']]
dec += [src['DEJ2000']]
src_skydir = SkyCoord(ra=np.array(ra) * u.deg,
dec=np.array(dec) * u.deg)
radec = np.vstack((src_skydir.ra.deg, src_skydir.dec.deg)).T
glonlat = np.vstack((src_skydir.galactic.l.deg,
src_skydir.galactic.b.deg)).T
offset = self.skydir.separation(src_skydir).deg
offset_cel = wcs_utils.sky_to_offset(self.skydir,
radec[:, 0], radec[:, 1], 'CEL')
offset_gal = wcs_utils.sky_to_offset(self.skydir,
glonlat[:, 0], glonlat[:, 1], 'GAL')
m0 = get_skydir_distance_mask(src_skydir, self.skydir,
self.config['src_radius'])
m1 = get_skydir_distance_mask(src_skydir, self.skydir,
self.config['src_radius_roi'],
square=True, coordsys=coordsys)
m = (m0 & m1)
srcs = np.array(srcs)[m]
for i, s in enumerate(srcs):
s.data['offset'] = offset[m][i]
s.data['offset_ra'] = offset_cel[:, 0][m][i]
s.data['offset_dec'] = offset_cel[:, 1][m][i]
s.data['offset_glon'] = offset_gal[:, 0][m][i]
s.data['offset_glat'] = offset_gal[:, 1][m][i]
self.load_source(s, False,
merge_sources=self.config['merge_sources'])
for i, s in enumerate(diffuse_srcs):
self.load_source(s, False,
merge_sources=self.config['merge_sources'])
self._build_src_index()
def _build_src_index(self):
"""Build an indices for fast lookup of a source given its name
or coordinates."""
self._srcs = sorted(self._srcs, key=lambda t: t['offset'])
nsrc = len(self._srcs)
radec = np.zeros((2, nsrc))
for i, src in enumerate(self._srcs):
radec[:, i] = src.radec
self._src_skydir = SkyCoord(ra=radec[0], dec=radec[1], unit=u.deg)
self._src_radius = self._src_skydir.separation(self.skydir)
[docs] def write_xml(self, xmlfile):
"""Save the ROI model as an XML file."""
root = ElementTree.Element('source_library')
root.set('title', 'source_library')
for s in self._srcs:
s.write_xml(root)
for s in self._diffuse_srcs:
s.write_xml(root)
output_file = open(xmlfile, 'w')
output_file.write(utils.prettify_xml(root))
[docs] def create_table(self):
scan_shape = (1,)
for src in self._srcs:
scan_shape = max(scan_shape,src['dloglike_scan'].shape)
tab = create_source_table(scan_shape)
row_dict = {}
for s in self._srcs:
row_dict['Source_Name'] = s['name']
row_dict['RAJ2000'] = s['ra']
row_dict['DEJ2000'] = s['dec']
row_dict['GLON'] = s['glon']
row_dict['GLAT'] = s['glat']
row_dict['param_names'] = np.empty(6,dtype='S32')
row_dict['param_names'].fill('')
row_dict['param_values'] = np.empty(6,dtype=float)*np.nan
row_dict['param_errors'] = np.empty(6,dtype=float)*np.nan
params = copy.deepcopy(s['params'])
if 'spectrum_type' in params:
del params['spectrum_type']
param_names = gtutils.get_function_par_names(s['SpectrumType'])
for i, k in enumerate(param_names[:6]):
row_dict['param_names'][i] = k
row_dict['param_values'][i] = params[k][0]
row_dict['param_errors'][i] = params[k][1]
r68_semimajor = s['pos_sigma_semimajor']*s['pos_r68']/s['pos_sigma']
r68_semiminor = s['pos_sigma_semiminor']*s['pos_r68']/s['pos_sigma']
r95_semimajor = s['pos_sigma_semimajor']*s['pos_r95']/s['pos_sigma']
r95_semiminor = s['pos_sigma_semiminor']*s['pos_r95']/s['pos_sigma']
row_dict['Conf_68_PosAng'] = s['pos_angle']
row_dict['Conf_68_SemiMajor'] = r68_semimajor
row_dict['Conf_68_SemiMinor'] = r68_semiminor
row_dict['Conf_95_PosAng'] = s['pos_angle']
row_dict['Conf_95_SemiMajor'] = r95_semimajor
row_dict['Conf_95_SemiMinor'] = r95_semiminor
row_dict.update(s.get_catalog_dict())
for t in s.data.keys():
if t == 'params':
continue
if t in tab.columns:
row_dict[t] = s[t]
row = [row_dict[k] for k in tab.columns]
tab.add_row(row)
return tab
[docs] def write_fits(self,fitsfile):
"""Write the ROI model to a FITS file."""
tab = self.create_table()
tab.write(fitsfile,format='fits',overwrite=True)
hdulist = pyfits.open(fitsfile)
for h in hdulist:
h.header['CREATOR'] = 'fermipy ' + fermipy.__version__
hdulist.writeto(fitsfile, clobber=True)
if __name__ == '__main__':
roi = ROIModel()
roi.load_fits('gll_fssc_psc_v14.fit')
src = roi.get_source_by_name('lmc')
import pprint
pprint.pprint(src.data)
print(src)
srcs = roi.get_nearby_sources('lmc', 10.0)
roi.create_roi_from_source('test.xml', 'lmc', 'test', 'test', 90.0)