Source code for fermipy.sed

#!/usr/bin/env python
#

# Description
"""
Utilities for dealing with SEDs

Many parts of this code are taken from dsphs/like/lnlfn.py by 
  Matthew Wood <mdwood@slac.stanford.edu>
  Alex Drlica-Wagner <kadrlica@slac.stanford.edu>
"""

import copy
import logging

import numpy as np
from scipy.interpolate import UnivariateSpline, splrep, splev
import scipy.optimize as opt
import scipy.special as spf
from scipy.integrate import quad
import scipy
import astropy.io.fits as pf
from astropy.coordinates import SkyCoord

import fermipy.config
import fermipy.defaults as defaults
import fermipy.utils as utils
from fermipy.utils import read_energy_bounds, read_spectral_data
from fermipy.fits_utils import read_map_from_fits
from fermipy.logger import Logger
from fermipy.logger import logLevel
from fermipy.sourcefind import find_peaks, refine_peak

from LikelihoodState import LikelihoodState

# Some useful functions


[docs]def alphaToDeltaLogLike_1DOF(alpha): """return the delta log-likelihood corresponding to a particular C.L. of (1-alpha)% """ dlnl = pow(np.sqrt(2.)*spf.erfinv(1-2*alpha),2.)/2. return dlnl
FluxTypes = ['NORM','FLUX','EFLUX','NPRED','DIF_FLUX','DIF_EFLUX'] PAR_NAMES = {"PowerLaw":["Prefactor","Index"], "LogParabola":["norm","alpha","beta"], "PLExpCutoff":["Prefactor","Index1","Cutoff"]} #class SEDGenerator(fermipy.config.Configurable):
[docs]class SEDGenerator(object): """Mixin class which provides SED functionality to `~fermipy.gtanalysis.GTAnalysis`."""
[docs] def sed(self, name, profile=True, energies=None, **kwargs): """Generate a spectral energy distribution (SED) for a source. This function will fit the normalization of the source in each energy bin. By default the SED will be generated with the analysis energy bins but a custom binning can be defined with the ``energies`` parameter. Parameters ---------- name : str Source name. prefix : str Optional string that will be prepended to all output files (FITS and rendered images). profile : bool Profile the likelihood in each energy bin. energies : `~numpy.ndarray` Sequence of energies in log10(E/MeV) defining the edges of the energy bins. If this argument is None then the analysis energy bins will be used. The energies in this sequence must align with the bin edges of the underyling analysis instance. bin_index : float Spectral index that will be use when fitting the energy distribution within an energy bin. use_local_index : bool Use a power-law approximation to the shape of the global spectrum in each bin. If this is false then a constant index set to `bin_index` will be used. fix_background : bool Fix background components when fitting the flux normalization in each energy bin. If fix_background=False then all background parameters that are currently free in the fit will be profiled. By default fix_background=True. ul_confidence : float Set the confidence level that will be used for the calculation of flux upper limits in each energy bin. cov_scale : float Scaling factor that will be applied when setting the gaussian prior on the normalization of free background sources. If this parameter is None then no gaussian prior will be applied. Returns ------- sed : dict Dictionary containing output of the SED analysis. This dictionary is also saved to the 'sed' dictionary of the `~fermipy.roi_model.Source` instance. """ name = self.roi.get_source_by_name(name, True).name prefix = kwargs.get('prefix','') self.logger.info('Computing SED for %s' % name) o = self._make_sed(name,profile,energies,**kwargs) self._plotter.make_sed_plot(self, name, **kwargs) self.logger.info('Finished SED') return o
def _make_sed(self, name, profile=True, energies=None, **kwargs): # Extract options from kwargs config = copy.deepcopy(self.config['sed']) config.update(kwargs) bin_index = config['bin_index'] use_local_index = config['use_local_index'] fix_background = config['fix_background'] ul_confidence = config['ul_confidence'] cov_scale = config['cov_scale'] if energies is None: energies = self.energies else: energies = np.array(energies) nbins = len(energies) - 1 max_index = 5.0 min_flux = 1E-30 erange = self.erange # Output Dictionary o = {'emin': energies[:-1], 'emax': energies[1:], 'ecenter': 0.5 * (energies[:-1] + energies[1:]), 'flux': np.zeros(nbins), 'eflux': np.zeros(nbins), 'dfde': np.zeros(nbins), 'e2dfde': np.zeros(nbins), 'flux_err': np.zeros(nbins), 'eflux_err': np.zeros(nbins), 'dfde_err': np.zeros(nbins), 'e2dfde_err': np.zeros(nbins), 'flux_ul95': np.zeros(nbins) * np.nan, 'eflux_ul95': np.zeros(nbins) * np.nan, 'dfde_ul95': np.zeros(nbins) * np.nan, 'e2dfde_ul95': np.zeros(nbins) * np.nan, 'flux_ul': np.zeros(nbins) * np.nan, 'eflux_ul': np.zeros(nbins) * np.nan, 'dfde_ul': np.zeros(nbins) * np.nan, 'e2dfde_ul': np.zeros(nbins) * np.nan, 'dfde_err_lo': np.zeros(nbins) * np.nan, 'e2dfde_err_lo': np.zeros(nbins) * np.nan, 'dfde_err_hi': np.zeros(nbins) * np.nan, 'e2dfde_err_hi': np.zeros(nbins) * np.nan, 'index': np.zeros(nbins), 'Npred': np.zeros(nbins), 'ts': np.zeros(nbins), 'fit_quality': np.zeros(nbins), 'lnlprofile': [], 'correlation' : {}, 'model_flux' : {}, 'config': config } saved_state = LikelihoodState(self.like) # Perform global spectral fit self._latch_free_params() self.free_sources(False,pars='shape') self.free_source(name) self.fit(loglevel=logging.DEBUG,update=False, min_fit_quality=2) o['model_flux'] = self.bowtie(name) self._restore_free_params() # Setup background parameters for SED self.free_sources(False,pars='shape') self.free_norm(name) if fix_background: self.free_sources(free=False) elif cov_scale is not None: self._latch_free_params() self.zero_source(name) self.fit(loglevel=logging.DEBUG,update=False) srcNames = list(self.like.sourceNames()) srcNames.remove(name) self.constrain_norms(srcNames, cov_scale) self.unzero_source(name) self._restore_free_params() # Precompute fluxes in each bin from global fit gf_bin_flux = [] gf_bin_index = [] for i, (emin, emax) in enumerate(zip(energies[:-1], energies[1:])): delta = 1E-5 f = self.like[name].flux(10 ** emin, 10 ** emax) f0 = self.like[name].flux(10 ** emin * (1 - delta), 10 ** emin * (1 + delta)) f1 = self.like[name].flux(10 ** emax * (1 - delta), 10 ** emax * (1 + delta)) if f0 > min_flux: g = 1 - np.log10(f0 / f1) / np.log10(10 ** emin / 10 ** emax) gf_bin_index += [g] gf_bin_flux += [f] else: gf_bin_index += [max_index] gf_bin_flux += [min_flux] source = self.components[0].like.logLike.getSource(name) old_spectrum = source.spectrum() self.like.setSpectrum(name, 'PowerLaw') self.free_parameter(name, 'Index', False) self.set_parameter(name, 'Prefactor', 1.0, scale=1E-13, true_value=False, bounds=[1E-10, 1E10], update_source=False) src_norm_idx = -1 free_params = self.get_params(True) for j, p in enumerate(free_params): if not p['is_norm']: continue if p['is_norm'] and p['src_name'] == name: src_norm_idx = j o['correlation'][p['src_name']] = np.zeros(nbins) * np.nan for i, (emin, emax) in enumerate(zip(energies[:-1], energies[1:])): ecenter = 0.5 * (emin + emax) self.set_parameter(name, 'Scale', 10 ** ecenter, scale=1.0, bounds=[1, 1E6], update_source=False) if use_local_index: o['index'][i] = -min(gf_bin_index[i], max_index) else: o['index'][i] = -bin_index self.set_parameter(name, 'Index', o['index'][i], scale=1.0, update_source=False) normVal = self.like.normPar(name).getValue() flux_ratio = gf_bin_flux[i] / self.like[name].flux(10 ** emin, 10 ** emax) newVal = max(normVal * flux_ratio, 1E-10) self.set_norm(name, newVal) self.like.syncSrcParams(name) self.free_norm(name) self.logger.debug('Fitting %s SED from %.0f MeV to %.0f MeV' % (name, 10 ** emin, 10 ** emax)) self.setEnergyRange(emin, emax) fit_output = self.fit(loglevel=logging.DEBUG,update=False) free_params = self.get_params(True) for j, p in enumerate(free_params): if not p['is_norm']: continue o['correlation'][p['src_name']][i] = \ fit_output['correlation'][src_norm_idx,j] o['fit_quality'][i] = fit_output['fit_quality'] prefactor = self.like[self.like.par_index(name, 'Prefactor')] flux = self.like[name].flux(10 ** emin, 10 ** emax) flux_err = self.like.fluxError(name, 10 ** emin, 10 ** emax) eflux = self.like[name].energyFlux(10 ** emin, 10 ** emax) eflux_err = self.like.energyFluxError(name, 10 ** emin, 10 ** emax) dfde = prefactor.getTrueValue() dfde_err = dfde * flux_err / flux e2dfde = dfde * 10 ** (2 * ecenter) o['flux'][i] = flux o['eflux'][i] = eflux o['dfde'][i] = dfde o['e2dfde'][i] = e2dfde o['flux_err'][i] = flux_err o['eflux_err'][i] = eflux_err o['dfde_err'][i] = dfde_err o['e2dfde_err'][i] = dfde_err * 10 ** (2 * ecenter) cs = self.model_counts_spectrum(name, emin, emax, summed=True) o['Npred'][i] = np.sum(cs) o['ts'][i] = max(self.like.Ts2(name, reoptimize=False), 0.0) if profile: lnlp = self.profile_norm(name, emin=emin, emax=emax, savestate=False, reoptimize=True, npts=20) o['lnlprofile'] += [lnlp] ul_data = utils.get_parameter_limits(lnlp['flux'], lnlp['dlogLike']) o['flux_ul95'][i] = ul_data['ul'] o['eflux_ul95'][i] = ul_data['ul']*(lnlp['eflux'][-1]/lnlp['flux'][-1]) o['dfde_ul95'][i] = ul_data['ul']*(lnlp['dfde'][-1]/lnlp['flux'][-1]) o['e2dfde_ul95'][i] = o['dfde_ul95'][i] * 10 ** (2 * ecenter) o['dfde_err_hi'][i] = ul_data['err_hi']*(lnlp['dfde'][-1]/lnlp['flux'][-1]) o['e2dfde_err_hi'][i] = o['dfde_err_hi'][i] * 10 ** (2 * ecenter) o['dfde_err_lo'][i] = ul_data['err_lo']*(lnlp['dfde'][-1]/lnlp['flux'][-1]) o['e2dfde_err_lo'][i] = o['dfde_err_lo'][i] * 10 ** (2 * ecenter) ul_data = utils.get_parameter_limits(lnlp['flux'], lnlp['dlogLike'], ul_confidence=ul_confidence) o['flux_ul'][i] = ul_data['ul'] o['eflux_ul'][i] = ul_data['ul']*(lnlp['eflux'][-1]/lnlp['flux'][-1]) o['dfde_ul'][i] = ul_data['ul']*(lnlp['dfde'][-1]/lnlp['flux'][-1]) o['e2dfde_ul'][i] = o['dfde_ul'][i] * 10 ** (2 * ecenter) self.setEnergyRange(erange[0], erange[1]) self.like.setSpectrum(name, old_spectrum) saved_state.restore() if cov_scale is not None: self.remove_priors() src = self.roi.get_source_by_name(name, True) src.update_data({'sed': copy.deepcopy(o)}) return o
[docs]class Interpolator(object): """ Helper class for interpolating a 1-D function from a set of tabulated values. Safely deals with overflows and underflows """ def __init__(self,x,y): """ C'tor, take input array of x and y value """ x = np.squeeze(np.array(x,ndmin=1)) y = np.squeeze(np.array(y,ndmin=1)) msk = np.isfinite(y) x = x[msk] y = y[msk] y -= np.max(y) self._x = x self._y = y self._xmin = x[0] self._xmax = x[-1] self._ymin = y[0] self._ymax = y[-1] self._dydx_lo = (y[1]-y[0])/(x[1]-x[0]) self._dydx_hi = (y[-1]-y[-2])/(x[-1]-x[-2]) self._fn = UnivariateSpline(x,y,s=0,k=2) self._sp = splrep(x,y,k=2,s=0) @property def xmin(self): """ return the minimum value over which the spline is defined """ return self._xmin @property def xmax(self): """ return the maximum value over which the spline is defined """ return self._xmax @property def x(self): """ return the x values used to construct the split """ return self._x @property def y(self): """ return the y values used to construct the split """ return self._y
[docs] def derivative(self,x,der=1): """ return the derivative a an array of input values x : the inputs der : the order of derivative """ return splev(x,self._sp,der=der)
[docs] def __call__(self,x): """ Return the interpolated values for an array of inputs x : the inputs Note that if any x value is outside the interpolation ranges this will return a linear extrapolation based on the slope at the endpoint """ x = np.array(x,ndmin=1) below_bounds = x < self._xmin above_bounds = x > self._xmax dxhi = np.array(x-self._xmax) dxlo = np.array(x-self._xmin) # UnivariateSpline will only accept 1-D arrays so this # passes a flattened version of the array. y = self._fn(x.ravel()) y.resize(x.shape) y[above_bounds] = (self._ymax + dxhi[above_bounds]*self._dydx_hi) y[below_bounds] = (self._ymin + dxlo[below_bounds]*self._dydx_lo) return y
[docs]class LnLFn(object): """ Helper class for interpolating a 1-D log-likelihood function from a set of tabulated values. """ def __init__(self,x,y,fluxType=0): """ C'tor, take input array of x and y value fluxType : code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ self._interp = Interpolator(x,y) self._mle = None self._fluxType = fluxType @property def interp(self): """ return the underlying Interpolator object """ return self._interp @property def fluxType(self): """ return a code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ return self._fluxType def _compute_mle(self): """ compute the maximum likelihood estimate, using the scipy.optimize.brentq method """ if self._interp.y[0] == np.max(self._interp.y): self._mle = self._interp.x[0] else: ix0 = max(np.argmax(self._interp.y)-4,0) ix1 = min(np.argmax(self._interp.y)+4,len(self._interp.x)-1) while np.sign(self._interp.derivative(self._interp.x[ix0])) == np.sign(self._interp.derivative(self._interp.x[ix1])): ix0 += 1 self._mle = scipy.optimize.brentq(self._interp.derivative, self._interp.x[ix0], self._interp.x[ix1], xtol=1e-10*np.median(self._interp.x))
[docs] def mle(self): """ return the maximum likelihood estimate This will return the cached value, if it exists """ if self._mle is None: self._compute_mle() return self._mle
[docs] def fn_mle(self): """ return the function value at the maximum likelihood estimate """ return self._interp(self.mle())
[docs] def TS(self): """ return the Test Statistic """ return 2. * (self._interp(self.mle()) - self._interp(0.))
[docs] def getLimit(self,alpha,upper=True): """ Evaluate the limits corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. upper : upper or lower limits. """ dlnl = alphaToDeltaLogLike_1DOF(alpha) lnl_max = self.fn_mle() # This ultra-safe code to find an absolute maximum #fmax = self.fn_mle() #m = (fmax-self.interp.y > 0.1+dlnl) & (self.interp.x>self._mle) #if sum(m) == 0: # xmax = self.interp.x[-1]*10 #else: # xmax = self.interp.x[m][0] # Matt has found that this is use an interpolator than an actual root-finder to # find the root probably b/c of python overhead #rf = lambda x: self._interp(x)+dlnl-lnl_max if upper: x = np.linspace(self._mle,self._interp.xmax,100) #return opt.brentq(rf,self._mle,self._interp.xmax,xtol=1e-10*np.abs(self._mle)) else: x = np.linspace(self._mle,self._interp.xmin,100) #return opt.brentq(rf,self._interp.xmin,self._mle,xtol=1e-10*np.abs(self._mle)) retVal = np.interp(dlnl,lnl_max-self.interp(x),x) return retVal
[docs] def getInterval(self,alpha): """ Evaluate the interval corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. """ lo_lim = self.getLimit(alpha,upper=False) hi_lim = self.getLimit(alpha,upper=True) return (lo_err,hi_err)
[docs]class SpecData(object): """ This class wraps spectral data, e.g., energy bin definitions, flux values and number of predicted photons """ def __init__(self,ebins,fluxes,npreds): """ """ self._ebins = ebins self._log_ebins = np.log10(self._ebins) self._evals = np.sqrt(self._ebins[0:-1]*self._ebins[1:]) self._bin_widths = self._ebins[1:] - self._ebins[0:-1] self._fluxes = fluxes self._efluxes = self._ebins * self._fluxes self._npreds = npreds self._ne = len(ebins)-1 @property def log_ebins(self): """ return the log10 of the energy bin edges """ return self._log_ebins @property def ebins(self): """ return the energy bin edges """ return self._ebins @property def bin_widths(self): """ return the energy bin widths """ return self._bin_widths @property def evals(self): """ return the energy centers """ return self._evals @property def fluxes(self): """ return the flux values """ return self._fluxes @property def efluxes(self): """ return the energy flux values """ return self._efluxes @property def npreds(self): """ return the number of predicted events """ return self._npreds @property def nE(self): """ return the number of energy bins """ return self._ne
[docs]class CastroData(object): """This class wraps the data needed to make a "Castro" plot, namely the log-likelihood as a function of normalization for a series of energy bins. """ def __init__(self,norm_vals,nll_vals,specData,fluxType): """ C'tor Parameters ---------- norm_vals : The normalization values ( nEBins X N array, where N is the number of sampled values for each bin ) nll_vals : The log-likelihood values ( nEBins X N array, where N is the number of sampled values for each bin ) specData : The specData object fluxType : code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ self._norm_vals = norm_vals self._nll_vals = nll_vals self._specData = specData self._fluxType = fluxType self._loglikes = [] self._ne = self._specData.nE self._nll_null = 0.0 if fluxType == 0: factors = np.ones((self._specData.nE)) elif fluxType == 1: factors = np.sqrt(self._specData.fluxes[0:-1]*self._specData.fluxes[1:]) * self._specData.bin_widths elif fluxType == 2: factors = np.sqrt(self._specData.efluxes[0:-1]*self._specData.efluxes[1:]) * self._specData.bin_widths elif fluxType == 3: factors = self._specData.npreds elif fluxType == 4: factors = np.sqrt(self._specData.fluxes[0:-1]*self._specData.fluxes[1:]) elif fluxType == 5: factors = np.sqrt(self._specData.efluxes[0:-1]*self._specData.efluxes[1:]) else: raise Exception('Unknown flux type: %s. Options are 0-5'%fluxType) for ie in range(self._specData.nE): nvv = factors[ie]*self._norm_vals[ie] nllfunc = LnLFn(nvv,self._nll_vals[ie],self._fluxType) self._nll_null -= self._nll_vals[ie][0] self._loglikes.append(nllfunc) pass @property def specData(self): """ Return the Spectral Data object """ return self._specData @property def fluxType(self): """ Return the Flux type flag """ return self._fluxType @property def nll_null(self): """ Return the negative log-likelihood for the null-hypothesis """ return self._nll_null
[docs] def __getitem__(self,i): """ return the LnLFn object for the ith energy bin """ return self._loglikes[i]
[docs] def __call__(self,x): """ return the log-like for an array of values, summed over the energy bins Parameters ---------- x : `~numpy.ndarray` Array of nEbins x M values Returns ------- nll_val : `~numpy.ndarray` Array of negative log-likelihood values. """ nll_val = 0. # crude hack to force the fitter away from unphysical values if ( x < 0 ).any(): return 1000. for i,xv in enumerate(x): nll_val -= self._loglikes[i].interp(xv) return nll_val
[docs] def derivative(self,x,der=1): """ return the derivate of the log-like summed over the energy bins Parameters ---------- x : `~numpy.ndarray` Array of nEbins x M values der : int Order of the derivate Returns ------- der_val : `~numpy.ndarray` Array of negative log-likelihood values. """ der_val = 0. for i,xv in enumerate(x): der_val -= self._loglikes[i].interp.derivative(xv,der=der) pass return der_val
[docs] def mles(self): """ return the maximum likelihood estimates for each of the energy bins """ mle_vals = np.ndarray((self._ne)) for i in range(self._ne): mle_vals[i] = self._loglikes[i].mle() pass return mle_vals
[docs] def fn_mles(self): """ returns the summed likelihood at the maximum likelihood estimate Note that simply sums the maximum likelihood values at each bin, and does not impose any sort of constrain between bins """ mle_vals = self.mles() return self(mle_vals)
[docs] def ts_vals(self): """ returns test statistic values for each energy bin """ ts_vals = np.ndarray((self._ne)) for i in range(self._ne): ts_vals[i] = self._loglikes[i].TS() pass return ts_vals
[docs] def getLimits(self,alpha,upper=True): """ Evaluate the limits corresponding to a C.L. of (1-alpha)%. Parameters ---------- alpha : limit confidence level. upper : upper or lower limits. returns an array of values, one for each energy bin """ limit_vals = np.ndarray((self._ne)) for i in range(self._ne): limit_vals[i] = self._loglikes[i].getLimit(alpha,upper) pass return limit_vals
[docs] def fitNormalization(self,specVals,xlims): """ Fit the normalization given a set of spectral values that define a spectral shape This version is faster, and solves for the root of the derivatvie Parameters ---------- specVals : an array of (nebin values that define a spectral shape xlims : fit limits returns the best-fit normalization value """ fDeriv = lambda x : self.derivative(specVals*x) try: result = scipy.optimize.brentq(fDeriv,xlims[0],xlims[1]) except: if self.__call__(specVals*xlims[0]) < self.__call__(specVals*xlims[1]): return xlims[0] else: return xlims[1] return result
[docs] def fitNorm_v2(self,specVals): """ Fit the normalization given a set of spectral values that define a spectral shape This version uses scipy.optimize.fmin Parameters ---------- specVals : an array of (nebin values that define a spectral shape xlims : fit limits returns the best-fit normalization value """ fToMin = lambda x : self.__call__(specVals*x) result = scipy.optimize.fmin(fToMin,0.,disp=False,xtol=1e-6) return result
[docs] def fit_spectrum(self,specFunc,initPars): """ Fit for the free parameters of a spectral function Parameters ---------- specFunc : The Spectral Function initPars : The initial values of the parameters Returns ------- result : tuple The output of scipy.optimize.fmin spec_out : `~numpy.ndarray` The best-fit spectral values TS_spec : float The TS of the best-fit spectrum """ fToMin = lambda x : self.__call__(specFunc(x)) result = scipy.optimize.fmin(fToMin,initPars,disp=False,xtol=1e-6) spec_out = specFunc(result) TS_spec = self.TS_spectrum(spec_out) return result,spec_out,TS_spec
[docs] def TS_spectrum(self,spec_vals): """ Calculate and the TS for a given set of spectral values """ return 2. * (self._nll_null - self.__call__(spec_vals))
[docs] def test_spectra(self,spec_types=["PowerLaw","LogParabola","PLExpCutoff"]): """ """ retDict = {} for specType in spec_types: spec_func,init_pars,scaleEnergy = self.buildTestSpectrumFunction(specType) fit_result,fit_spec,fit_ts = self.fit_spectrum(spec_func,init_pars) # tweak the fit result to account for the flux type if self._fluxType == 0: fit_result[0] *= self._specData.fluxes[0] * self._specData.bin_widths[0] fit_result[1] -= 2. elif self._fluxType == 1: #fit_result[0] *= 1. fit_result[1] -= 1. elif self._fluxType == 2: fit_result[0] /= self._specData.ebins[0] fit_result[1] -= 2. elif self._fluxType == 3: fit_result[0] *= self._specData.fluxes[0] * self._specData.bin_widths[0] / self._specData.npreds[0] fit_result[1] -= 1. elif self._fluxType == 4: fit_result[0] *= self._specData.bin_widths[0] elif self._fluxType == 5: fit_result[0] *= self._specData.bin_widths[0] / self._specData.ebins[0] fit_result[1] -= 1. specDict = {"Function":spec_func, "Result":fit_result, "Spectrum":fit_spec, "ScaleEnergy":scaleEnergy, "TS":fit_ts} retDict[specType] = specDict pass return retDict
[docs] def buildTestSpectrumFunction(self,specType): """ """ scaleEnergy = self._specData.ebins[0] cutoffEnergy = 10.*scaleEnergy # The initial parameters depend how the flux is expressed if self._fluxType == 0: initPars = np.array([1e-3,0.0,0.0]) initPars_pc = np.array([1e-3,0.0,cutoffEnergy]) elif self._fluxType == 1: initPars = np.array([1e-12,-1.0,0.0]) initPars_pc = np.array([1e-12,-1.0,cutoffEnergy]) elif self._fluxType == 2: initPars = np.array([1e-7,0.0,0.0]) initPars_pc = np.array([1e-7,0.0,cutoffEnergy]) elif self._fluxType == 3: initPars = np.array([1.0,-2.0,0.0]) initPars_pc = np.array([1.0,-2.0,cutoffEnergy]) elif self._fluxType == 4: initPars = np.array([1e-17,-2.0,0.0]) initPars_pc = np.array([1e-17,-2.0,cutoffEnergy]) elif self._fluxType == 5: initPars = np.array([1e-12,-1.0,0.0]) initPars_pc = np.array([1e-12,-1.0,cutoffEnergy]) # Build a function, and return it and the correct initial parameters if specType == "PowerLaw": return (PowerLaw(self._specData.evals,scaleEnergy),initPars[0:2],scaleEnergy) elif specType == "LogParabola": return (LogParabola(self._specData.evals,scaleEnergy),initPars,scaleEnergy) elif specType == "PLExpCutoff": return (PLExpCutoff(self._specData.evals,scaleEnergy),initPars_pc,scaleEnergy) else: print "Did not recognize test specturm type %s"%specType return None
[docs]class TSCube(object): """ """ def __init__(self,tsmap,normmap,tscube,norm_vals,nll_vals,specData,fluxType): """ C'tor Parameters ---------- tsmap : `~fermipy.utils.Map` A Map object with the TestStatistic values in each pixel tscube : `~fermipy.utils.Map` A Map object with the TestStatistic values in each pixel & energy bin norm_vals : `~numpy.ndarray` The normalization values ( nEBins X N array, where N is the number of sampled values for each bin ) nll_vals : `~numpy.ndarray` The log-likelihood values ( nEBins X N array, where N is the number of sampled values for each bin ) specData : `~fermipy.sed.SpecData` The specData object fluxType : code specifying the quantity used for the flux 0: Normalization w.r.t. to test source 1: Flux of the test source ( ph cm^-2 s^-1 ) 2: Energy Flux of the test source ( MeV cm^-2 s^-1 ) 3: Number of predicted photons 4: Differential flux of the test source ( ph cm^-2 s^-1 MeV^-1 ) 5: Differential energy flux of the test source ( MeV cm^-2 s^-1 MeV^-1 ) """ self._tsmap = tsmap self._normmap = normmap self._tscube = tscube self._ts_cumul = tscube.sum_over_energy() self._specData = specData self._norm_vals = norm_vals self._nll_vals = nll_vals self._nE = self._specData.nE self._nN = 10 self._castro_shape = (self._nN,self._nE) self._fluxType = fluxType @property def tsmap(self): """ return the Map of the TestStatistic value """ return self._tsmap @property def normmap(self): """ return the Map of the Best-fit normalization value """ return self._normmap @property def tscube(self): """ return the Cube of the TestStatistic value per pixel / energy bin """ return self._tscube @property def ts_cumul(self): """ return the Map of the cumulative TestStatistic value per pixel (summed over energy bin) """ return self._ts_cumul @property def specData(self): """ Return the Spectral Data object """ return self._specData @property def nE(self): """ return the number of energy bins """ return self._nE @property def nN(self): """ return the number of sample points in each energy bin """ return self._nN @staticmethod
[docs] def create_from_fits(fitsfile,fluxType): """ Build a TSCube object from a fits file created by gttscube """ m,f = read_map_from_fits(fitsfile) n,f = read_map_from_fits(fitsfile,"N_MAP") c,f = read_map_from_fits(fitsfile,"TSCUBE") log_ebins,fluxes,npreds = read_spectral_data(f["EBOUNDS"]) ebins = np.power(10.,log_ebins) specData = SpecData(ebins,fluxes,npreds) cube_data_hdu = f["SCANDATA"] nll_vals = cube_data_hdu.data.field("NLL_SCAN") norm_vals = cube_data_hdu.data.field("NORMSCAN") return TSCube(m,n,c,norm_vals,nll_vals,specData,fluxType)
[docs] def castroData_from_ipix(self,ipix,colwise=False): """ Build a CastroData object for a particular pixel """ # pix = utils.skydir_to_pix if colwise: ipix = self._tsmap.ipix_swap_axes(ipix,colwise) norm_d = self._norm_vals[ipix].reshape(self._castro_shape).swapaxes(0,1) nll_d = self._nll_vals[ipix].reshape(self._castro_shape).swapaxes(0,1) return CastroData(norm_d,nll_d,self._specData,self._fluxType)
[docs] def castroData_from_pix_xy(self,xy,colwise=False): """ Build a CastroData object for a particular pixel """ ipix = self._tsmap.xy_pix_to_ipix(xy,colwise) return self.castroData_from_ipix(ipix)
[docs] def find_and_refine_peaks(self,threshold,min_separation=1.0,use_cumul=False): """ """ if use_cumul: theMap = self._ts_cumul else: theMap = self._tsmap peaks = find_peaks(theMap,threshold,min_separation) for peak in peaks: o = utils.fit_parabola(theMap.counts,peak['iy'],peak['ix'],dpix=2) peak['fit_loc'] = o peak['fit_skydir'] = SkyCoord.from_pixel(o['y0'],o['x0'],theMap.wcs) if o['fit_success']: skydir = peak['fit_skydir'] else: skydir = peak['skydir'] pass return peaks
[docs] def test_spectra_of_peak(self,peak,spec_types=["PowerLaw","LogParabola","PLExpCutoff"]): """ """ castro = self.castroData_from_pix_xy(xy=(peak['ix'],peak['iy']),colwise=False) test_dict = castro.test_spectra(spec_types) return (castro,test_dict)
[docs] def find_sources(self,threshold, min_separation=1.0, use_cumul=False, output_peaks=False, output_castro=False, output_specInfo=False, output_src_dicts=False, output_srcs=False): """ """ srcs = [] src_dicts = [] castros = [] specInfo = [] names = [] peaks = self.find_and_refine_peaks(threshold,min_separation,use_cumul=True) for i,peak in enumerate(peaks): (castro,test_dict) = self.test_spectra_of_peak(peak,["PowerLaw"]) src_name = utils.create_source_name(peak['fit_skydir']) src_dict = build_source_dict(src_name,peak,test_dict,"PowerLaw") names.append(src_dict["name"]) if output_castro: castros.append(castro) if output_specInfo: specInfo.append(test_dict) if output_src_dicts: src_dicts.append(src_dict) if output_srcs: src = roi_model.Source.create_from_dict(src_dict) srcs.append(src) pass retDict = {"Names":names} if output_peaks: retDict["Peaks"]=peaks if output_castro: retDict["Castro"]=castros if output_specInfo: retDict["Spectral"]=specInfo if output_src_dicts: retDict["SrcDicts"]=src_dicts if output_srcs: retDict["Sources"]=srcs return retDict
[docs]def PowerLaw(evals,scale): """ """ evals_scaled = evals/scale return lambda x : x[0] * np.power(evals_scaled,x[1])
[docs]def LogParabola(evals,scale): """ """ evals_scaled = evals/scale log_evals_scaled = np.log(evals_scaled) return lambda x : x[0] * np.power(evals_scaled,x[1]-x[2]*log_evals_scaled);
[docs]def PLExpCutoff(evals,scale): """ """ evals_scaled = evals/scale evals_diff = scale - evals return lambda x : x[0] * np.power(evals_scaled,x[1]) * np.exp(evals_diff/x[2])
[docs]def build_source_dict(src_name,peak_dict,spec_dict,spec_type): """ """ spec_results = spec_dict[spec_type] src_dir = peak_dict['fit_skydir'] src_dict = dict(name=src_name, Source_Name=src_name, SpatialModel='PointSource', SpectrumType=spec_type, ts=spec_results["TS"][0], ra=src_dir.icrs.ra.deg, dec=src_dir.icrs.dec.deg, Prefactor=spec_results["Result"][0], Index=-1.*spec_results["Result"][1], Scale=spec_results["ScaleEnergy"]) return src_dict
if __name__ == "__main__": from fermipy import sed from fermipy import roi_model import fermipy.utils as utils import xml.etree.cElementTree as ElementTree import sys if len(sys.argv) == 1: flux_type = 0 elif sys.argv[1] == "norm": flux_type = 0 elif sys.argv[1] == "flux": flux_type = 1 elif sys.argv[1] == "eflux": flux_type = 2 elif sys.argv[1] == "npred": flux_type = 3 elif sys.argv[1] == "d_flux": flux_type = 4 elif sys.argv[1] == "d_eflux": flux_type = 5 else: print "Didn't reconginize flux type %s, choose from norm | flux | eflux | npred"%sys.argv[1] tscube = sed.TSCube.create_from_fits("tscube_test.fits",flux_type) resultDict = tscube.find_sources(10.0,1.0,use_cumul=True, output_peaks=True, output_specInfo=True) figList = [] peaks = resultDict["Peaks"] specInfos = resultDict["Spectral"] sources = resultDict["Sources"] root = ElementTree.Element('source_library') root.set('title', 'source_library') for src in sources: src.write_xml(root) """ result_pl = test_dict["PowerLaw"]["Result"] result_lp = test_dict["LogParabola"]["Result"] result_pc = test_dict["PLExpCutoff"]["Result"] ts_pl = test_dict["PowerLaw"]["TS"] ts_lp = test_dict["LogParabola"]["TS"] ts_pc = test_dict["PLExpCutoff"]["TS"] print "TS for PL index = 2: %.1f"%max_ts print "Cumulative TS: %.1f"%castro.ts_vals().sum() print "TS for PL index free: %.1f (Index = %.2f)"%(ts_pl[0],idx_off-result_pl[1]) print "TS for LogParabola: %.1f (Index = %.2f, Beta = %.2f)"%(ts_lp[0],idx_off-result_lp[1],result_lp[2]) print "TS for PLExpCutoff: %.1f (Index = %.2f, E_c = %.2f)"%(ts_pc[0],idx_off-result_pc[1],result_pc[2]) """ output_file = open("sed_sources.xml", 'w!') output_file.write(utils.prettify_xml(root))