Source code for fermipy.sed

"""
Utilities for dealing with SEDs

Many parts of this code are taken from dsphs/like/lnlfn.py by 
  Matthew Wood <mdwood@slac.stanford.edu>
  Alex Drlica-Wagner <kadrlica@slac.stanford.edu>
"""

from __future__ import absolute_import, division, print_function, \
    unicode_literals

import copy
import logging
import os

import numpy as np
from scipy.interpolate import UnivariateSpline, splrep, splev
import scipy.optimize as opt
import scipy.special as spf
from scipy.integrate import quad
import scipy

import pyLikelihood as pyLike

import astropy.io.fits as pf
from astropy.coordinates import SkyCoord
from astropy.table import Table, Column

import fermipy.config
import fermipy.defaults as defaults
import fermipy.utils as utils
import fermipy.gtutils as gtutils
import fermipy.roi_model as roi_model
from fermipy.wcs_utils import wcs_add_energy_axis
from fermipy.fits_utils import read_energy_bounds, read_spectral_data
from fermipy.skymap import read_map_from_fits, Map
from fermipy.logger import Logger
from fermipy.logger import logLevel
from fermipy.sourcefind import find_peaks, refine_peak
from fermipy.spectrum import SpectralFunction

from LikelihoodState import LikelihoodState

# Some useful functions

FluxTypes = ['NORM','FLUX','EFLUX','NPRED','DIF_FLUX','DIF_EFLUX']

PAR_NAMES = {"PowerLaw":["Prefactor","Index"],
             "LogParabola":["norm","alpha","beta"],
             "PLExpCutoff":["Prefactor","Index1","Cutoff"]}

[docs]class SEDGenerator(object): """Mixin class that provides SED functionality to `~fermipy.gtanalysis.GTAnalysis`."""
[docs] def sed(self, name, profile=True, energies=None, **kwargs): """Generate a spectral energy distribution (SED) for a source. This function will fit the normalization of the source in each energy bin. By default the SED will be generated with the analysis energy bins but a custom binning can be defined with the ``energies`` parameter. Parameters ---------- name : str Source name. prefix : str Optional string that will be prepended to all output files (FITS and rendered images). profile : bool Profile the likelihood in each energy bin. energies : `~numpy.ndarray` Sequence of energies in log10(E/MeV) defining the edges of the energy bins. If this argument is None then the analysis energy bins will be used. The energies in this sequence must align with the bin edges of the underyling analysis instance. bin_index : float Spectral index that will be use when fitting the energy distribution within an energy bin. use_local_index : bool Use a power-law approximation to the shape of the global spectrum in each bin. If this is false then a constant index set to `bin_index` will be used. fix_background : bool Fix background components when fitting the flux normalization in each energy bin. If fix_background=False then all background parameters that are currently free in the fit will be profiled. By default fix_background=True. ul_confidence : float Set the confidence level that will be used for the calculation of flux upper limits in each energy bin. cov_scale : float Scaling factor that will be applied when setting the gaussian prior on the normalization of free background sources. If this parameter is None then no gaussian prior will be applied. write_fits : bool write_npy : bool Returns ------- sed : dict Dictionary containing output of the SED analysis. This dictionary is also saved to the 'sed' dictionary of the `~fermipy.roi_model.Source` instance. """ name = self.roi.get_source_by_name(name).name prefix = kwargs.get('prefix','') write_fits = kwargs.get('write_fits',True) self.logger.info('Computing SED for %s' % name) o = self._make_sed(name,profile,energies,**kwargs) self._plotter.make_sed_plot(self, name, **kwargs) if write_fits: self._make_sed_fits(o,name,**kwargs) self.logger.info('Finished SED') return o
def _make_sed_fits(self,sed,name,**kwargs): prefix = kwargs.get('prefix',None) name = name.lower().replace(' ', '_') # Write a FITS file cols = [Column(name='E_MIN',dtype='f8',data=10**sed['emin'],unit='MeV'), Column(name='E_REF',dtype='f8',data=10**sed['ecenter'],unit='MeV'), Column(name='E_MAX',dtype='f8',data=10**sed['emax'],unit='MeV'), Column(name='REF_DFDE_E_MIN',dtype='f8', data=sed['ref_dfde_emin'],unit='ph / (MeV cm2 s)'), Column(name='REF_DFDE_E_MAX',dtype='f8', data=sed['ref_dfde_emax'],unit='ph / (MeV cm2 s)'), Column(name='REF_DFDE',dtype='f8', data=sed['ref_dfde'],unit='ph / (MeV cm2 s)'), Column(name='REF_FLUX',dtype='f8', data=sed['ref_flux'],unit='ph / (cm2 s)'), Column(name='REF_EFLUX',dtype='f8', data=sed['ref_eflux'],unit='MeV / (cm2 s)'), Column(name='REF_NPRED',dtype='f8',data=sed['ref_npred']), Column(name='NORM',dtype='f8',data=sed['norm']), Column(name='NORM_ERR',dtype='f8',data=sed['norm_err']), Column(name='NORM_ERRP',dtype='f8',data=sed['norm_err_hi']), Column(name='NORM_ERRN',dtype='f8',data=sed['norm_err_lo']), Column(name='NORM_UL95',dtype='f8',data=sed['norm_ul95']), Column(name='TS',dtype='f8',data=sed['ts']), Column(name='LOGLIKE',dtype='f8', data=sed['loglike']), Column(name='NORM_SCAN',dtype='f8',data=sed['norm_scan']), Column(name='DLOGLIKE_SCAN',dtype='f8', data=sed['dloglike_scan']), ] tab = Table(cols) filename = utils.format_filename(self.config['fileio']['workdir'], 'sed', prefix=[prefix,name], extension='.fits') sed['file'] = os.path.basename(filename) tab.write(filename,format='fits',overwrite=True) def _make_sed(self, name, profile=True, energies=None, **kwargs): # Extract options from kwargs config = copy.deepcopy(self.config['sed']) config.update(kwargs) bin_index = config['bin_index'] use_local_index = config['use_local_index'] fix_background = config['fix_background'] ul_confidence = config['ul_confidence'] cov_scale = config['cov_scale'] if energies is None: energies = self.energies else: energies = np.array(energies) nbins = len(energies) - 1 max_index = 5.0 min_flux = 1E-30 npts = self.config['gtlike']['llscan_npts'] erange = self.erange # Output Dictionary o = {'emin': energies[:-1], 'emax': energies[1:], 'ecenter': 0.5 * (energies[:-1] + energies[1:]), 'ref_flux': np.zeros(nbins), 'ref_eflux': np.zeros(nbins), 'ref_dfde': np.zeros(nbins), 'ref_dfde_emin': np.zeros(nbins), 'ref_dfde_emax': np.zeros(nbins), 'ref_e2dfde': np.zeros(nbins), 'ref_npred': np.zeros(nbins), 'norm': np.zeros(nbins), 'flux': np.zeros(nbins), 'eflux': np.zeros(nbins), 'dfde': np.zeros(nbins), 'e2dfde': np.zeros(nbins), 'index': np.zeros(nbins), 'npred': np.zeros(nbins), 'ts': np.zeros(nbins), 'loglike' : np.zeros(nbins), 'norm_scan' : np.zeros((nbins,npts)), 'dloglike_scan' : np.zeros((nbins,npts)), 'loglike_scan' : np.zeros((nbins,npts)), 'fit_quality': np.zeros(nbins), 'lnlprofile': [], 'correlation' : {}, 'model_flux' : {}, 'params' : {}, 'config': config } for t in ['norm','flux','eflux','dfde','e2dfde']: o['%s_err'%t] = np.zeros(nbins)*np.nan o['%s_err_hi'%t] = np.zeros(nbins)*np.nan o['%s_err_lo'%t] = np.zeros(nbins)*np.nan o['%s_ul95'%t] = np.zeros(nbins)*np.nan o['%s_ul'%t] = np.zeros(nbins)*np.nan saved_state = LikelihoodState(self.like) source = self.components[0].like.logLike.getSource(name) # Perform global spectral fit self._latch_free_params() self.free_sources(False,pars='shape') self.free_source(name) self.fit(loglevel=logging.DEBUG,update=False, min_fit_quality=2) o['model_flux'] = self.bowtie(name) spectral_pars = gtutils.get_pars_dict_from_source(source) o['params'] = roi_model.get_params_dict(spectral_pars) self._restore_free_params() # Setup background parameters for SED self.free_sources(False,pars='shape') self.free_norm(name) if fix_background: self.free_sources(free=False) elif cov_scale is not None: self._latch_free_params() self.zero_source(name) self.fit(loglevel=logging.DEBUG,update=False) srcNames = list(self.like.sourceNames()) srcNames.remove(name) self.constrain_norms(srcNames, cov_scale) self.unzero_source(name) self._restore_free_params() # Precompute fluxes in each bin from global fit gf_bin_flux = [] gf_bin_index = [] for i, (emin, emax) in enumerate(zip(energies[:-1], energies[1:])): delta = 1E-5 f = self.like[name].flux(10 ** emin, 10 ** emax) f0 = self.like[name].flux(10 ** emin * (1 - delta), 10 ** emin * (1 + delta)) f1 = self.like[name].flux(10 ** emax * (1 - delta), 10 ** emax * (1 + delta)) if f0 > min_flux: g = 1 - np.log10(f0 / f1) / np.log10(10 ** emin / 10 ** emax) gf_bin_index += [g] gf_bin_flux += [f] else: gf_bin_index += [max_index] gf_bin_flux += [min_flux] old_spectrum = source.spectrum() self.like.setSpectrum(name, str('PowerLaw')) self.free_parameter(name, 'Index', False) self.set_parameter(name, 'Prefactor', 1.0, scale=1E-13, true_value=False, bounds=[1E-10, 1E10], update_source=False) self.set_parameter(name, 'Scale', 1E3, scale=1.0, bounds=[1, 1E6], update_source=False) src_norm_idx = -1 free_params = self.get_params(True) for j, p in enumerate(free_params): if not p['is_norm']: continue if p['is_norm'] and p['src_name'] == name: src_norm_idx = j o['correlation'][p['src_name']] = np.zeros(nbins) * np.nan for i, (emin, emax) in enumerate(zip(energies[:-1], energies[1:])): ectr = 0.5 * (emin + emax) self.set_parameter(name, 'Scale', 10 ** ectr, scale=1.0, bounds=[1, 1E6], update_source=False) if use_local_index: o['index'][i] = -min(gf_bin_index[i], max_index) else: o['index'][i] = -bin_index self.set_norm(name, 1.0, update_source=False) self.set_parameter(name, 'Index', o['index'][i], scale=1.0, update_source=False) self.like.syncSrcParams(name) ref_flux = self.like[name].flux(10 ** emin, 10 ** emax) o['ref_flux'][i] = self.like[name].flux(10 ** emin, 10 ** emax) o['ref_eflux'][i] = self.like[name].energyFlux(10 ** emin, 10 ** emax) o['ref_dfde'][i] = self.like[name].spectrum()(pyLike.dArg(10 ** ectr)) o['ref_dfde_emin'][i] = self.like[name].spectrum()(pyLike.dArg(10 ** emin)) o['ref_dfde_emax'][i] = self.like[name].spectrum()(pyLike.dArg(10 ** emax)) o['ref_e2dfde'][i] = o['ref_dfde'][i]*10**(2.0*ectr) cs = self.model_counts_spectrum(name, emin, emax, summed=True) o['ref_npred'][i] = np.sum(cs) normVal = self.like.normPar(name).getValue() flux_ratio = gf_bin_flux[i] / ref_flux newVal = max(normVal * flux_ratio, 1E-10) self.set_norm(name, newVal, update_source=False) self.like.syncSrcParams(name) self.free_norm(name) self.logger.debug('Fitting %s SED from %.0f MeV to %.0f MeV' % (name, 10 ** emin, 10 ** emax)) self.set_energy_range(emin, emax) fit_output = self.fit(loglevel=logging.DEBUG,update=False) free_params = self.get_params(True) for j, p in enumerate(free_params): if not p['is_norm']: continue o['correlation'][p['src_name']][i] = \ fit_output['correlation'][src_norm_idx,j] o['fit_quality'][i] = fit_output['fit_quality'] prefactor = self.like[self.like.par_index(name, 'Prefactor')] flux = self.like[name].flux(10 ** emin, 10 ** emax) eflux = self.like[name].energyFlux(10 ** emin, 10 ** emax) dfde = self.like[name].spectrum()(pyLike.dArg(10 ** ectr)) #prefactor.getTrueValue() o['norm'][i] = flux/o['ref_flux'][i] o['flux'][i] = flux o['eflux'][i] = eflux o['dfde'][i] = dfde o['e2dfde'][i] = dfde * 10 ** (2 * ectr) cs = self.model_counts_spectrum(name, emin, emax, summed=True) o['npred'][i] = np.sum(cs) o['ts'][i] = max(self.like.Ts2(name, reoptimize=False), 0.0) o['loglike'][i] = -self.like() lnlp = self.profile_norm(name, emin=emin, emax=emax, savestate=False, reoptimize=True, npts=20) o['loglike_scan'][i] = lnlp['loglike'] o['dloglike_scan'][i] = lnlp['dloglike'] o['norm_scan'][i] = lnlp['flux']/ref_flux o['lnlprofile'] += [lnlp] ul_data = utils.get_parameter_limits(lnlp['flux'], lnlp['dloglike']) o['norm_err_hi'][i] = ul_data['err_hi']/ref_flux o['norm_err_lo'][i] = ul_data['err_lo']/ref_flux if np.isfinite(ul_data['err_lo']): o['norm_err'][i] = 0.5*(ul_data['err_lo'] + ul_data['err_hi'])/ref_flux else: o['norm_err'][i] = ul_data['err_hi']/ref_flux o['norm_ul95'][i] = ul_data['ul']/ref_flux ul_data = utils.get_parameter_limits(lnlp['flux'], lnlp['dloglike'], ul_confidence=ul_confidence) o['norm_ul'][i] = ul_data['ul']/ref_flux for t in ['flux','eflux','dfde','e2dfde']: o['%s_err'%t] = o['norm_err']*o['ref_%s'%t] o['%s_err_hi'%t] = o['norm_err_hi']*o['ref_%s'%t] o['%s_err_lo'%t] = o['norm_err_lo']*o['ref_%s'%t] o['%s_ul95'%t] = o['norm_ul95']*o['ref_%s'%t] o['%s_ul'%t] = o['norm_ul']*o['ref_%s'%t] self.set_energy_range(erange[0], erange[1]) self.like.setSpectrum(name, old_spectrum) saved_state.restore() self._sync_params(name) if cov_scale is not None: self.remove_priors() src = self.roi.get_source_by_name(name) src.update_data({'sed': copy.deepcopy(o)}) return o
[docs]class Interpolator(object): """ Helper class for interpolating a 1-D function from a set of tabulated values. Safely deals with overflows and underflows """ def __init__(self,x,y): """ C'tor, take input array of x and y value """ x = np.squeeze(np.array(x,ndmin=1)) y = np.squeeze(np.array(y,ndmin=1)) msk = np.isfinite(y) x = x[msk] y = y[msk] self._x = x self._y = y self._xmin = x[0] self._xmax = x[-1] self._ymin = y[0] self._ymax = y[-1] self._dydx_lo = (y[1]-y[0])/(x[1]-x[0]) self._dydx_hi = (y[-1]-y[-2])/(x[-1]-x[-2]) self._fn = UnivariateSpline(x,y,s=0,k=1) self._sp = splrep(x,y,k=1,s=0) @property def xmin(self): """ return the minimum value over which the spline is defined """ return self._xmin @property def xmax(self): """ return the maximum value over which the spline is defined """ return self._xmax @property def x(self): """ return the x values used to construct the split """ return self._x @property def y(self): """ return the y values used to construct the split """ return self._y
[docs] def derivative(self,x,der=1): """ return the derivative a an array of input values x : the inputs der : the order of derivative """ return splev(x,self._sp,der=der)
[docs] def __call__(self,x): """ Return the interpolated values for an array of inputs x : the inputs Note that if any x value is outside the interpolation ranges this will return a linear extrapolation based on the slope at the endpoint """ x = np.array(x,ndmin=1) below_bounds = x < self._xmin above_bounds = x > self._xmax dxhi = np.array(x-self._xmax) dxlo = np.array(x-self._xmin) # UnivariateSpline will only accept 1-D arrays so this # passes a flattened version of the array. y = self._fn(x.ravel()) y.resize(x.shape) y[above_bounds] = (self._ymax + dxhi[above_bounds]*self._dydx_hi) y[below_bounds] = (self._ymin + dxlo[below_bounds]*self._dydx_lo) return y
[docs]class LnLFn(object): """ Helper class for interpolating a 1-D log-likelihood function from a set of tabulated values. """ def __init__(self,x,y,norm_type=0): """ C'tor, take input array of x and y value norm_type : code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ self._interp = Interpolator(x,y) self._mle = None self._norm_type = norm_type @property def interp(self): """ return the underlying Interpolator object """ return self._interp @property def norm_type(self): """ return a code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ return self._norm_type def _compute_mle(self): """ compute the maximum likelihood estimate, using the scipy.optimize.brentq method """ if self._interp.y[0] == np.min(self._interp.y): self._mle = self._interp.x[0] else: ix0 = max(np.argmin(self._interp.y)-4,0) ix1 = min(np.argmin(self._interp.y)+4,len(self._interp.x)-1) while np.sign(self._interp.derivative(self._interp.x[ix0])) == np.sign(self._interp.derivative(self._interp.x[ix1])): ix0 += 1 self._mle = scipy.optimize.brentq(self._interp.derivative, self._interp.x[ix0], self._interp.x[ix1], xtol=1e-10*np.median(self._interp.x))
[docs] def mle(self): """ return the maximum likelihood estimate This will return the cached value, if it exists """ if self._mle is None: self._compute_mle() return self._mle
[docs] def fn_mle(self): """ return the function value at the maximum likelihood estimate """ return self._interp(self.mle())
[docs] def TS(self): """ return the Test Statistic """ return 2. * (self._interp(0.) - self._interp(self.mle()))
[docs] def getLimit(self,alpha,upper=True): """ Evaluate the limits corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. upper : upper or lower limits. """ dlnl = utils.cl_to_dlnl(1.0-alpha) lnl_max = self.fn_mle() # This ultra-safe code to find an absolute maximum #fmax = self.fn_mle() #m = (fmax-self.interp.y > 0.1+dlnl) & (self.interp.x>self._mle) #if sum(m) == 0: # xmax = self.interp.x[-1]*10 #else: # xmax = self.interp.x[m][0] # Matt has found that this is use an interpolator than an actual root-finder to # find the root probably b/c of python overhead #rf = lambda x: self._interp(x)+dlnl-lnl_max if upper: x = np.linspace(self._mle,self._interp.xmax,100) #return opt.brentq(rf,self._mle,self._interp.xmax,xtol=1e-10*np.abs(self._mle)) else: x = np.linspace(self._mle,self._interp.xmin,100) #return opt.brentq(rf,self._interp.xmin,self._mle,xtol=1e-10*np.abs(self._mle)) retVal = np.interp(dlnl,lnl_max-self.interp(x),x) return retVal
[docs] def getInterval(self,alpha): """ Evaluate the interval corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. """ lo_lim = self.getLimit(alpha,upper=False) hi_lim = self.getLimit(alpha,upper=True) return (lo_err,hi_err)
[docs]class SpecData(object): """ This class wraps spectral data, e.g., energy bin definitions, flux values and number of predicted photons """ def __init__(self,emin,emax,dfde,flux,eflux,npred): """ Parameters ---------- emin : `~numpy.ndarray` Array of lower bin edges. emax : `~numpy.ndarray` Array of upper bin edges. """ self._ebins = np.append(emin,emax[-1]) self._emin = emin self._emax = emax self._log_ebins = np.log10(self._ebins) self._evals = np.sqrt(self.emin*self.emax) self._bin_widths = self._ebins[1:] - self._ebins[0:-1] self._dfde = dfde self._flux = flux self._eflux = eflux self._npred = npred self._ne = len(self.ebins)-1 @property def log_ebins(self): """ return the log10 of the energy bin edges """ return self._log_ebins @property def ebins(self): """ return the energy bin edges """ return self._ebins @property def emin(self): """ return the lower energy bin edges """ return self._emin @property def emax(self): """ return the lower energy bin edges """ return self._emax @property def bin_widths(self): """ return the energy bin widths """ return self._bin_widths @property def evals(self): """ return the energy centers """ return self._evals @property def dfde(self): """ return the differential flux values """ return self._dfde @property def eflux(self): """ return the energy flux values """ return self._eflux @property def npred(self): """ return the number of predicted events """ return self._npred @property def nE(self): """ return the number of energy bins """ return self._ne
[docs]class CastroData(object): """This class wraps the data needed to make a "Castro" plot, namely the log-likelihood as a function of normalization for a series of energy bins. """ def __init__(self,norm_vals,nll_vals,specData,norm_type): """ C'tor Parameters ---------- norm_vals : `~numpy.ndarray` The normalization values ( nEBins X N array, where N is the number of sampled values for each bin ) nll_vals : `~numpy.ndarray` The log-likelihood values ( nEBins X N array, where N is the number of sampled values for each bin ) specData : `~fermipy.sed.SpecData` The specData object norm_type : str String specifying the quantity used for the normalization: NORM: Normalization w.r.t. to test source FLUX: Flux of the test source ( ph cm^-2 s^-1 ) EFLUX: Energy Flux of the test source ( MeV cm^-2 s^-1 ) NPRED: Number of predicted photons DFDE: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) E2DFDE: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ self._norm_vals = norm_vals self._nll_vals = nll_vals self._specData = specData self._norm_type = norm_type self._loglikes = [] self._ne = self._specData.nE self._nll_null = 0.0 for ie in range(self._specData.nE): nvv = self._norm_vals[ie] nllfunc = LnLFn(nvv,self._nll_vals[ie],self._norm_type) self._nll_null += self._nll_vals[ie][0] self._loglikes.append(nllfunc) @property def specData(self): """ Return the Spectral Data object """ return self._specData @property def norm_type(self): """ Return the normalization type flag """ return self._norm_type @property def nll_null(self): """ Return the negative log-likelihood for the null-hypothesis """ return self._nll_null @staticmethod
[docs] def create_from_fits(fitsfile,norm_type='FLUX', hdu_scan="SCANDATA", hdu_energies="EBOUNDS"): tab_s = Table.read(fitsfile,hdu=hdu_scan) tab_e = Table.read(fitsfile,hdu=hdu_energies) if norm_type == 'FLUX': norm_vals = np.array(tab_s['NORM_SCAN']*tab_e['REF_FLUX'][:,np.newaxis]) elif norm_type == 'EFLUX': norm_vals = np.array(tab_s['NORM_SCAN']*tab_e['REF_EFLUX'][:,np.newaxis]) else: raise Exception('Unrecognized normalization type: %s'%norm_type) nll_vals = -np.array(tab_s['DLOGLIKE_SCAN']) emin = np.array(tab_e['E_MIN']) emax = np.array(tab_e['E_MAX']) npred = np.array(tab_s['NORM']*tab_e['REF_NPRED']) dfde = np.array(tab_s['NORM']*tab_e['REF_DFDE']) flux = np.array(tab_s['NORM']*tab_e['REF_FLUX']) eflux = np.array(tab_s['NORM']*tab_e['REF_EFLUX']) sd = SpecData(emin,emax,dfde,flux,eflux,npred) return CastroData(norm_vals,nll_vals,sd,norm_type)
[docs] def __getitem__(self,i): """ return the LnLFn object for the ith energy bin """ return self._loglikes[i]
[docs] def __call__(self,x): """Return the negative log-likelihood for an array of values, summed over the energy bins Parameters ---------- x : `~numpy.ndarray` Array of nEbins x M values Returns ------- nll_val : `~numpy.ndarray` Array of negative log-likelihood values. """ nll_val = 0. # crude hack to force the fitter away from unphysical values if ( x < 0 ).any(): return 1000. for i,xv in enumerate(x): nll_val += self._loglikes[i].interp(xv) return nll_val
[docs] def derivative(self,x,der=1): """Return the derivate of the log-like summed over the energy bins Parameters ---------- x : `~numpy.ndarray` Array of nEbins x M values der : int Order of the derivate Returns ------- der_val : `~numpy.ndarray` Array of negative log-likelihood values. """ der_val = 0. for i,xv in enumerate(x): der_val += self._loglikes[i].interp.derivative(xv,der=der) return der_val
[docs] def mles(self): """ return the maximum likelihood estimates for each of the energy bins """ mle_vals = np.ndarray((self._ne)) for i in range(self._ne): mle_vals[i] = self._loglikes[i].mle() return mle_vals
[docs] def fn_mles(self): """ returns the summed likelihood at the maximum likelihood estimate Note that simply sums the maximum likelihood values at each bin, and does not impose any sort of constrain between bins """ mle_vals = self.mles() return self(mle_vals)
[docs] def ts_vals(self): """ returns test statistic values for each energy bin """ ts_vals = np.ndarray((self._ne)) for i in range(self._ne): ts_vals[i] = self._loglikes[i].TS() pass return ts_vals
[docs] def getLimits(self,alpha,upper=True): """ Evaluate the limits corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. upper : upper or lower limits. returns an array of values, one for each energy bin """ limit_vals = np.ndarray((self._ne)) for i in range(self._ne): limit_vals[i] = self._loglikes[i].getLimit(alpha,upper) pass return limit_vals
[docs] def fitNormalization(self,specVals,xlims): """Fit the normalization given a set of spectral values that define a spectral shape This version is faster, and solves for the root of the derivatvie Parameters ---------- specVals : an array of (nebin values that define a spectral shape xlims : fit limits returns the best-fit normalization value """ fDeriv = lambda x : self.derivative(specVals*x) try: result = scipy.optimize.brentq(fDeriv,xlims[0],xlims[1]) except: if self.__call__(specVals*xlims[0]) < self.__call__(specVals*xlims[1]): return xlims[0] else: return xlims[1] return result
[docs] def fitNorm_v2(self,specVals): """ Fit the normalization given a set of spectral values that define a spectral shape This version uses scipy.optimize.fmin Parameters ---------- specVals : an array of (nebin values that define a spectral shape xlims : fit limits returns the best-fit normalization value """ fToMin = lambda x : self.__call__(specVals*x) result = scipy.optimize.fmin(fToMin,0.,disp=False,xtol=1e-6) return result
[docs] def fit_spectrum(self,specFunc,initPars): """ Fit for the free parameters of a spectral function Parameters ---------- specFunc : The Spectral Function initPars : The initial values of the parameters Returns ------- result : tuple The output of scipy.optimize.fmin spec_out : `~numpy.ndarray` The best-fit spectral values TS_spec : float The TS of the best-fit spectrum """ def fToMin(x): return self.__call__(specFunc(x)) result = scipy.optimize.fmin(fToMin,initPars,disp=False,xtol=1e-6) spec_out = specFunc(result) TS_spec = self.TS_spectrum(spec_out) return result,spec_out,TS_spec
[docs] def spectrum_loglike(self,specType,params,scale=1E3): sfn = self.create_functor(specType,scale)[0] return self.__call__(sfn(params))
[docs] def TS_spectrum(self,spec_vals): """ Calculate and the TS for a given set of spectral values """ return 2. * (self._nll_null - self.__call__(spec_vals))
[docs] def test_spectra(self,spec_types=['PowerLaw','LogParabola','PLExpCutoff']): """ """ retDict = {} for specType in spec_types: spec_func,init_pars,scaleEnergy = self.create_functor(specType) fit_result,fit_spec,fit_ts = self.fit_spectrum(spec_func,init_pars) specDict = {"Function":spec_func, "Result":fit_result, "Spectrum":fit_spec, "ScaleEnergy":scaleEnergy, "TS":fit_ts} retDict[specType] = specDict pass return retDict
[docs] def create_functor(self,specType,scale=1E3): """Create a functor object that computes normalizations in a sequence of energy bins for a given spectral model. """ emin = self._specData.emin emax = self._specData.emax if specType == 'PowerLaw': initPars = np.array([5e-13,-2.0]) elif specType == 'LogParabola': initPars = np.array([5e-13,-2.0,0.0]) elif specType == 'PLExpCutoff': initPars = np.array([5e-13,-1.0,1E4]) else: raise Exception('Unknown spectral type: %s'%specType) fn = SpectralFunction.create_functor(specType, self.norm_type, emin, emax, scale=scale) return (fn,initPars,scale)
[docs]class TSCube(object): """ """ def __init__(self,tsmap,normmap,tscube,norm_vals,nll_vals,specData,norm_type): """C'tor Parameters ---------- tsmap : `~fermipy.skymap.Map` A Map object with the TestStatistic values in each pixel normmap : `~fermipy.skymap.Map` tscube : `~fermipy.skymap.Map` A Map object with the TestStatistic values in each pixel & energy bin norm_vals : `~numpy.ndarray` The normalization values ( nEBins X N array, where N is the number of sampled values for each bin ) nll_vals : `~numpy.ndarray` The negative log-likelihood values ( nEBins X N array, where N is the number of sampled values for each bin ) specData : `~fermipy.sed.SpecData` The specData object norm_type : str String specifying the quantity used for the normalization * NORM : Normalization w.r.t. to test source * FLUX : Flux of the test source ( ph cm^-2 s^-1 ) * EFLUX : Energy Flux of the test source ( MeV cm^-2 s^-1 ) * NPRED : Number of predicted photons * DFDE : Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) * E2DFDE : E^2 times Differential energy flux of the test source ( MeV cm^-2 s^-1 ) """ self._tsmap = tsmap self._normmap = normmap self._tscube = tscube self._ts_cumul = tscube.sum_over_energy() self._specData = specData self._norm_vals = norm_vals self._nll_vals = nll_vals self._nE = self._specData.nE self._nN = 10 self._norm_type = norm_type @property def tsmap(self): """ return the Map of the TestStatistic value """ return self._tsmap @property def normmap(self): """ return the Map of the Best-fit normalization value """ return self._normmap @property def tscube(self): """ return the Cube of the TestStatistic value per pixel / energy bin """ return self._tscube @property def ts_cumul(self): """ return the Map of the cumulative TestStatistic value per pixel (summed over energy bin) """ return self._ts_cumul @property def specData(self): """ Return the Spectral Data object """ return self._specData @property def nE(self): """ return the number of energy bins """ return self._nE @property def nN(self): """ return the number of sample points in each energy bin """ return self._nN @staticmethod
[docs] def create_from_fits(fitsfile,norm_type='FLUX'): """Build a TSCube object from a fits file created by gttscube Parameters ---------- fitsfile : str Path to the tscube FITS file. norm_type : str String specifying the quantity used for the normalization """ m,f = read_map_from_fits(fitsfile) tab_e = Table.read(fitsfile,'EBOUNDS') tab_s = Table.read(fitsfile,'SCANDATA') emin = np.array(tab_e['E_MIN']/1E3) emax = np.array(tab_e['E_MAX']/1E3) nebins = len(tab_e) npred = tab_e['REF_NPRED'] ndim = len(m.counts.shape) if ndim == 2: cube_shape = (m.counts.shape[0],m.counts.shape[1],nebins) elif ndim == 1: cube_shape = (m.counts.shape[0],nebins) else: raise RuntimeError("Counts map has dimension %i"%(ndim)) specData = SpecData(emin,emax, np.array(tab_e['REF_DFDE']), np.array(tab_e['REF_FLUX']), np.array(tab_e['REF_EFLUX']), npred) nll_vals = -np.array(tab_s["DLOGLIKE_SCAN"]) norm_vals = np.array(tab_s["NORM_SCAN"]) wcs_3d = wcs_add_energy_axis(m.wcs,emin) c = Map(tab_s["TS"].reshape(cube_shape),wcs_3d) n = Map(tab_s["NORM"].reshape(cube_shape),wcs_3d) ref_colname = 'REF_%s'%norm_type norm_vals *= tab_e[ref_colname][np.newaxis,:,np.newaxis] return TSCube(m,n,c,norm_vals,nll_vals,specData, norm_type)
[docs] def castroData_from_ipix(self,ipix,colwise=False): """ Build a CastroData object for a particular pixel """ # pix = utils.skydir_to_pix if colwise: ipix = self._tsmap.ipix_swap_axes(ipix,colwise) norm_d = self._norm_vals[ipix] nll_d = self._nll_vals[ipix] return CastroData(norm_d,nll_d,self._specData,self._norm_type)
[docs] def castroData_from_pix_xy(self,xy,colwise=False): """ Build a CastroData object for a particular pixel """ ipix = self._tsmap.xy_pix_to_ipix(xy,colwise) return self.castroData_from_ipix(ipix)
[docs] def find_and_refine_peaks(self,threshold,min_separation=1.0,use_cumul=False): """ """ if use_cumul: theMap = self._ts_cumul else: theMap = self._tsmap peaks = find_peaks(theMap,threshold,min_separation) for peak in peaks: o = utils.fit_parabola(theMap.counts,peak['iy'],peak['ix'],dpix=2) peak['fit_loc'] = o peak['fit_skydir'] = SkyCoord.from_pixel(o['y0'],o['x0'],theMap.wcs) if o['fit_success']: skydir = peak['fit_skydir'] else: skydir = peak['skydir'] pass return peaks
[docs] def test_spectra_of_peak(self,peak,spec_types=["PowerLaw","LogParabola","PLExpCutoff"]): """ """ castro = self.castroData_from_pix_xy(xy=(peak['ix'],peak['iy']),colwise=False) test_dict = castro.test_spectra(spec_types) return (castro,test_dict)
[docs] def find_sources(self,threshold, min_separation=1.0, use_cumul=False, output_peaks=False, output_castro=False, output_specInfo=False, output_src_dicts=False, output_srcs=False): """ """ srcs = [] src_dicts = [] castros = [] specInfo = [] names = [] peaks = self.find_and_refine_peaks(threshold,min_separation,use_cumul=use_cumul) for i,peak in enumerate(peaks): (castro,test_dict) = self.test_spectra_of_peak(peak,["PowerLaw"]) src_name = utils.create_source_name(peak['fit_skydir']) src_dict = build_source_dict(src_name,peak,test_dict,"PowerLaw") names.append(src_dict["name"]) if output_castro: castros.append(castro) if output_specInfo: specInfo.append(test_dict) if output_src_dicts: src_dicts.append(src_dict) if output_srcs: src = roi_model.Source.create_from_dict(src_dict) srcs.append(src) pass retDict = {"Names":names} if output_peaks: retDict["Peaks"]=peaks if output_castro: retDict["Castro"]=castros if output_specInfo: retDict["Spectral"]=specInfo if output_src_dicts: retDict["SrcDicts"]=src_dicts if output_srcs: retDict["Sources"]=srcs return retDict
[docs]def build_source_dict(src_name,peak_dict,spec_dict,spec_type): """ """ spec_results = spec_dict[spec_type] src_dir = peak_dict['fit_skydir'] src_dict = dict(name=src_name, Source_Name=src_name, SpatialModel='PointSource', SpectrumType=spec_type, ts=spec_results["TS"][0], ra=src_dir.icrs.ra.deg, dec=src_dir.icrs.dec.deg, Prefactor=spec_results["Result"][0], Index=-1.*spec_results["Result"][1], Scale=spec_results["ScaleEnergy"]) return src_dict
if __name__ == "__main__": from fermipy import sed from fermipy import roi_model import fermipy.utils as utils import xml.etree.cElementTree as ElementTree import sys if len(sys.argv) == 1: flux_type = "FLUX" else: flux_type = sys.argv[1] tscube = sed.TSCube.create_from_fits("tscube_test.fits",flux_type) resultDict = tscube.find_sources(10.0,1.0,use_cumul=False, output_peaks=True, output_specInfo=True, output_srcs=True) figList = [] peaks = resultDict["Peaks"] specInfos = resultDict["Spectral"] sources = resultDict["Sources"] root = ElementTree.Element('source_library') root.set('title', 'source_library') for src in sources: src.write_xml(root) output_file = open("sed_sources.xml", 'w!') output_file.write(utils.prettify_xml(root)) """ idx_off = -2 for peak in peaks: castro,test_dict = tscube.test_spectra_of_peak(peak) result_pl = test_dict["PowerLaw"]["Result"] result_lp = test_dict["LogParabola"]["Result"] result_pc = test_dict["PLExpCutoff"]["Result"] ts_pl = test_dict["PowerLaw"]["TS"] ts_lp = test_dict["LogParabola"]["TS"] ts_pc = test_dict["PLExpCutoff"]["TS"] print ("Cumulative TS: %.1f"%castro.ts_vals().sum()) print ("TS for PL index free: %.1f (Index = %.2f)"%(ts_pl[0],idx_off-result_pl[1])) print ("TS for LogParabola: %.1f (Index = %.2f, Beta = %.2f)"%(ts_lp[0],idx_off-result_lp[1],result_lp[2])) print ("TS for PLExpCutoff: %.1f (Index = %.2f, E_c = %.2f)"%(ts_pc[0],idx_off-result_pc[1],result_pc[2])) """