Source code for fermipy.spectrum

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function
import os
import copy
import numpy as np
from scipy.interpolate import RegularGridInterpolator


[docs]def cast_args(x): if isinstance(x, np.ndarray) and x.ndim >= 2: return x return np.expand_dims(np.array(x, ndmin=1), -1)
[docs]def cast_params(params): if isinstance(params[0], np.ndarray) and params[0].ndim >= 2: return list(params) o = [] for i, p in enumerate(params): o += [np.expand_dims(np.array(params[i], ndmin=1), 0)] return o
[docs]class SEDFunctor(object): """Functor object that wraps a `~fermipy.spectrum.SpectralFunction` and computes the normalization of the model in a sequence of SED energy bins. The evaluation method of this class accepts a single vector for the parameters of the model. This class serves as an object that can be passed to likelihood optimizers. """ def __init__(self, sfn, emin, emax): self._emin = emin self._emax = emax self._sfn = sfn @property def emin(self): return self._emin @property def emax(self): return self._emax @property def spectral_fn(self): return self._sfn @property def scale(self): return self._sfn.scale @property def params(self): return self._sfn.params @scale.setter def scale(self, scale): self._sfn.scale = scale @params.setter def params(self, params): self._sfn.params = params
[docs]class SEDFluxFunctor(SEDFunctor): """Functor that computes the flux of a source in a pre-defined sequence of energy bins.""" def __init__(self, sfn, emin, emax): super(SEDFluxFunctor, self).__init__(sfn, emin, emax) def __call__(self, params): params = cast_params(params) emin = np.expand_dims(self.emin, 1) emax = np.expand_dims(self.emax, 1) return np.squeeze(self._sfn.flux(emin, emax, params))
[docs]class SEDEFluxFunctor(SEDFunctor): """Functor that computes the energy flux of a source in a pre-defined sequence of energy bins.""" def __init__(self, sfn, emin, emax): super(SEDEFluxFunctor, self).__init__(sfn, emin, emax) def __call__(self, params): params = cast_params(params) emin = np.expand_dims(self.emin, 1) emax = np.expand_dims(self.emax, 1) return np.squeeze(self._sfn.eflux(emin, emax, params))
[docs]class SpectralFunction(object): """Base class for spectral models. Spectral models inheriting from this class should implement at a minimum an `_eval_dnde` method which evaluates the differential flux at a given energy.""" def __init__(self, params, scale=1.0, extra_params=None): self._params = np.array(params) self._scale = scale self._extra_params = extra_params @property def params(self): """Return parameter vector of the function.""" return self._params @property def log_params(self): """Return transformed parameter vector in which norm and scale parameters are converted to log10.""" return self.params_to_log(self._params) @property def scale(self): return self._scale @scale.setter def scale(self, scale): self._scale = scale @params.setter def params(self, params): if params is None: self._params = np.zeros(self.nparam) else: self._params = np.array(params) @property def extra_params(self): """Dictionary containing additional parameters needed for evaluation of the function.""" return self._extra_params
[docs] @classmethod def create_from_flux(cls, params, emin, emax, flux, scale=1.0): """Create a spectral function instance given its flux.""" params = params.copy() params[0] = 1.0 params[0] = flux / cls.eval_flux(emin, emax, params, scale=scale) return cls(params, scale)
[docs] @classmethod def create_from_eflux(cls, params, emin, emax, eflux, scale=1.0): """Create a spectral function instance given its energy flux.""" params = params.copy() params[0] = 1.0 params[0] = eflux / cls.eval_eflux(emin, emax, params, scale=scale) return cls(params, scale)
[docs] @classmethod def create_functor(cls, spec_type, func_type, emin, emax, params=None, scale=1.0, extra_params=None): if isinstance(spec_type, SpectralFunction): sfn = copy.deepcopy(spec_type) else: sfn = eval(spec_type)(params, scale, extra_params) if func_type.lower() == 'flux': return SEDFluxFunctor(sfn, emin, emax) elif func_type.lower() == 'eflux': return SEDEFluxFunctor(sfn, emin, emax) else: raise Exception("Did not recognize func_type: %s" % func_type)
[docs] @classmethod def create_flux_functor(cls, emin, emax, params=None, scale=1.0, extra_params=None): sfn = cls(params, scale, extra_params) return SEDFluxFunctor(sfn, emin, emax)
[docs] @classmethod def create_eflux_functor(cls, emin, emax, params=None, scale=1.0, extra_params=None): sfn = cls(params, scale, extra_params) return SEDEFluxFunctor(sfn, emin, emax)
[docs] @classmethod def eval_e2dnde(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) return cls._eval_dnde(x, params, scale, extra_params) * x**2
[docs] @classmethod def eval_ednde(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) return cls._eval_dnde(x, params, scale, extra_params) * x
[docs] @classmethod def eval_dnde(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) return cls._eval_dnde(x, params, scale, extra_params)
[docs] @classmethod def eval_dnde_deriv(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) return cls._eval_dnde_deriv(x, params, scale, extra_params)
[docs] @classmethod def eval_ednde_deriv(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) dnde_deriv = cls._eval_dnde_deriv(x, params, scale, extra_params) dnde = cls._eval_dnde(x, params, scale) return x * dnde_deriv + dnde
[docs] @classmethod def eval_e2dnde_deriv(cls, x, params, scale=1.0, extra_params=None): x = cast_args(x) params = cast_params(params) dnde_deriv = cls._eval_dnde_deriv(x, params, scale, extra_params) dnde = cls._eval_dnde(x, params, scale) return x**2 * dnde_deriv + 2 * x * dnde
@classmethod def _integrate(cls, fn, emin, emax, params, scale=1.0, extra_params=None, npt=20): """Fast numerical integration method using mid-point rule.""" emin = np.expand_dims(emin, -1) emax = np.expand_dims(emax, -1) params = copy.deepcopy(params) for i, p in enumerate(params): params[i] = np.expand_dims(params[i], -1) xedges = np.linspace(0.0, 1.0, npt + 1) logx_edge = np.log(emin) + xedges * (np.log(emax) - np.log(emin)) logx = 0.5 * (logx_edge[..., 1:] + logx_edge[..., :-1]) xw = np.exp(logx_edge[..., 1:]) - np.exp(logx_edge[..., :-1]) dnde = fn(np.exp(logx), params, scale, extra_params) return np.sum(dnde * xw, axis=-1) @classmethod def _eval_dnde_deriv(cls, x, params, scale=1.0, extra_params=None, eps=1E-6): return (cls._eval_dnde(x + eps, params, scale) - cls._eval_dnde(x, params, scale)) / eps
[docs] @classmethod def eval_flux(cls, emin, emax, params, scale=1.0, extra_params=None): emin = cast_args(emin) emax = cast_args(emax) params = cast_params(params) return cls._integrate(cls.eval_dnde, emin, emax, params, scale, extra_params)
[docs] @classmethod def eval_eflux(cls, emin, emax, params, scale=1.0, extra_params=None): emin = cast_args(emin) emax = cast_args(emax) params = cast_params(params) return cls._integrate(cls.eval_ednde, emin, emax, params, scale, extra_params)
[docs] def dnde(self, x, params=None): """Evaluate differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_dnde(x, params, self.scale, self.extra_params))
[docs] def ednde(self, x, params=None): """Evaluate E times differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_ednde(x, params, self.scale, self.extra_params))
[docs] def e2dnde(self, x, params=None): """Evaluate E^2 times differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_e2dnde(x, params, self.scale, self.extra_params))
[docs] def dnde_deriv(self, x, params=None): """Evaluate derivative of the differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_dnde_deriv(x, params, self.scale, self.extra_params))
[docs] def ednde_deriv(self, x, params=None): """Evaluate derivative of E times differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_ednde_deriv(x, params, self.scale, self.extra_params))
[docs] def e2dnde_deriv(self, x, params=None): """Evaluate derivative of E^2 times differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_e2dnde_deriv(x, params, self.scale, self.extra_params))
[docs] def flux(self, emin, emax, params=None): """Evaluate the integral flux.""" params = self.params if params is None else params return np.squeeze(self.eval_flux(emin, emax, params, self.scale, self.extra_params))
[docs] def eflux(self, emin, emax, params=None): """Evaluate the integral energy flux.""" params = self.params if params is None else params return np.squeeze(self.eval_eflux(emin, emax, params, self.scale, self.extra_params))
[docs]class PowerLaw(SpectralFunction): """Class that evaluates a power-law function with the parameterization: F(x) = p_0 * (x/x_s)^p_1 where x_s is the scale parameter. The `params` array should be defined with: * params[0] : Prefactor (p_0) * params[1] : Index (p_1) """ def __init__(self, params=None, scale=1.0, extra_params=None): params = (params if params is not None else np.array([5e-13, -2.0])) super(PowerLaw, self).__init__(params, scale)
[docs] @staticmethod def nparam(): return 2
@staticmethod def _eval_dnde(x, params, scale=1.0, extra_params=None): return params[0] * (x / scale) ** params[1]
[docs] @staticmethod def eval_flux(emin, emax, params, scale=1.0, extra_params=None): phi0 = np.array(params[0], ndmin=1) index = np.array(params[1], ndmin=1) x0 = scale index1 = index + 1 m = np.isclose(index1, 0.0) iindex0 = np.zeros(index.shape) iindex1 = np.zeros(index.shape) iindex1[~m] = 1. / index1[~m] iindex0[m] = 1. xmin = emin / scale xmax = emax / scale v = phi0 * iindex1 * (emax * xmax**index - emin * xmin**index) if np.any(m): v += phi0 * x0 * iindex0 * (np.log(emax) - np.log(emin)) return v
[docs] @classmethod def eval_eflux(cls, emin, emax, params, scale=1.0, extra_params=None): params = copy.deepcopy(params) params[1] += 1.0 return cls.eval_flux(emin, emax, params, scale) * scale
[docs] @classmethod def eval_norm(cls, scale, index, emin, emax, flux): return flux / cls.eval_flux(emin, emax, [1.0, index], scale=scale)
[docs]class LogParabola(SpectralFunction): """Class that evaluates a function with the parameterization: F(x) = p_0 * (x/x_s)^(p_1 - p_2*log(x/x_s) ) where x_s is a scale parameter. The `params` array should be defined with: * params[0] : Prefactor (p_0) * params[1] : Index (p_1) * params[2] : Curvature (p_2) """ def __init__(self, params=None, scale=1.0, extra_params=None): params = (params if params is not None else np.array([5e-13, -2.0, 0.0])) super(LogParabola, self).__init__(params, scale)
[docs] @staticmethod def nparam(): return 3
@staticmethod def _eval_dnde(x, params, scale=1.0, extra_params=None): return (params[0] * (x / scale) ** (params[1] - params[2] * np.log(x / scale)))
[docs]class PLExpCutoff(SpectralFunction): """Class that evaluates a function with the parameterization: F(x) = p_0 * (x/x_s)^(p_1) * exp(- x/p_2 ) where x_s is the scale parameter. The `params` array should be defined with: * params[0] : Prefactor (p_0) * params[1] : Index (p_1) * params[2] : Cutoff (p_2) """ def __init__(self, params=None, scale=1.0, extra_params=None): params = (params if params is not None else np.array([5e-13, -1.0, 1E4])) super(PLExpCutoff, self).__init__(params, scale)
[docs] @staticmethod def params_to_log(params): return [np.log10(params[0]), params[1], np.log10(params[2])]
[docs] @staticmethod def log_to_params(params): return [10**params[0], params[1], 10**params[2]]
[docs] @staticmethod def nparam(): return 3
@staticmethod def _eval_dnde(x, params, scale=1.0, extra_params=None): return params[0] * (x / scale) ** (params[1]) * np.exp(-x / params[2]) @classmethod def _eval_dnde_deriv(cls, x, params, scale=1.0, extra_params=None): return (cls._eval_dnde(x, params, scale) * (params[1] * params[2] - x) / (params[2] * x))
[docs]class PLSuperExpCutoff(SpectralFunction): """Class that evaluates a function with the parameterization: F(x) = p_0 * (x/x_s)^(p_1) * exp(-(x/p2)**p3) where x_s is the scale parameter. The `params` array should be defined with: * params[0] : Prefactor (p_0) * params[1] : Index1 (p_1) * params[2] : Curvature (p_2) * params[3] : Index2 (p3) """ def __init__(self, params=None, scale=1.0, extra_params=None): params = (params if params is not None else np.array([5e-13, -1.0, 1E4, 1.0])) super(PLSuperExpCutoff, self).__init__(params, scale)
[docs] @staticmethod def params_to_log(params): return [np.log10(params[0]), params[1], np.log10(params[2]), params[3]]
[docs] @staticmethod def log_to_params(params): return [10**params[0], params[1], 10**params[2], params[3]]
[docs] @staticmethod def nparam(): return 4
@staticmethod def _eval_dnde(x, params, scale=1.0, extra_params=None): return params[0] * (x / scale) ** (params[1]) * np.exp(- (x / params[2]) ** (params[3])) @classmethod def _eval_dnde_deriv(cls, x, params, scale=1.0, extra_params=None): return (cls._eval_dnde(x, params, scale) * (params[1] * params[2] - x) / (params[2] * x))
[docs]class DMFitFunction(SpectralFunction): """Class that evaluates the spectrum for a DM particle of a given mass, channel, cross section, and J-factor. The parameterization is given by: F(x) = 1 / (8 * pi) * (1/mass^2) * sigmav * J * dN/dE(E,mass,i) where the `params` array should be defined with: * params[0] : sigmav (or tau for decay) * params[1] : mass Note that this class assumes that mass and J-factor are provided in units of GeV and GeV^2 cm^-5 while energies are defined in MeV. For decay the D-factor is in uits of GeV cm^-2 s """ # Mapping between the ST channel codes and the rows in the gammamc # file channel_index_mapping = { 1: 8, # ee 2: 6, # mumu 3: 3, # tautau 4: 1, # bb 5: 2, # tt 6: 7, # gg 7: 4, # ww 8: 5, # zz 9: 0, # cc 10: 10, # uu 11: 11, # dd 12: 9, # ss 101: 8, # ee 102: 6, # mumu 103: 3, # tautau 104: 1, # bb 105: 2, # tt 106: 7, # gg 107: 4, # ww 108: 5, # zz 109: 0, # cc 110: 10, # uu 111: 11, # dd 112: 9, # ss } # Mapping between ST channel codes and string aliases channel_name_mapping = { 1: ["e+e-", "ee"], 2: ["mu+mu-", "mumu", "musrc"], 3: ["tau+tau-", "tautau", "tausrc"], 4: ["bb-bar", "bb", "bbbar", "bbsrc"], 5: ["tt-bar", "tt"], 6: ["gluons", "gg"], 7: ["W+W-", "w+w-", "ww", "wwsrc"], 8: ["ZZ", "zz"], 9: ["cc-bar", "cc"], 10: ["uu-bar", "uu"], 11: ["dd-bar", "dd"], 12: ["ss-bar", "ss"], 101: ["e+e-_decay", "ee_decay"], 102: ["mu+mu-_decay", "mumu_decay", "musrc_decay"], 103: ["tau+tau-_decay", "tautau_decay", "tausrc_decay"], 104: ["bb-bar_decay", "bb_decay", "bbbar_decay", "bbsrc_decay"], 105: ["tt-bar_decay", "tt_decay"], 106: ["gluons_decay", "gg_decay"], 107: ["W+W-_decay", "w+w-_decay", "ww_decay", "wwsrc_decay"], 108: ["ZZ_decay", "zz_decay"], 109: ["cc-bar_decay", "cc_decay"], 110: ["uu-bar_decay", "uu_decay"], 111: ["dd-bar_decay", "dd_decay"], 112: ["ss-bar_decay", "ss_decay"]} channel_shortname_mapping = { 1: "ee", 2: "mumu", 3: "tautau", 4: "bb", 5: "tt", 6: "gg", 7: "ww", 8: "zz", 9: "cc", 10: "uu", 11: "dd", 12: "ss", 101: "ee", 102: "mumu", 103: "tautau", 104: "bb", 105: "tt", 106: "gg", 107: "ww", 108: "zz", 109: "cc", 110: "uu", 111: "dd", 112: "ss"} channel_rev_map = {vv: k for k, v in channel_name_mapping.items() for vv in v} @property def ann_channel_names(self): return [ self.channel_shortname_mapping[k] for k in range(1,13) ] @property def decay_channel_names(self): return [ self.channel_shortname_mapping[k] for k in range(101,113) ] def __init__(self, params, chan='bb', jfactor=1E19, tablepath=None, dfactor=1E17): """Constructor. Parameters ---------- params : list Parameter vector. chan : str Channel string. Can be one of cc, bb, tt, tautau, ww, zz, mumu, gg, ee, ss, uu, dd, or any of the above with _decay appended. jfactor : float J-factor of this object. Note that this needs to be given in the same units as the mass parameter. tablepath : str Path to lookup table with pre-computed DM spectra on a grid of energy, mass, and channel. dfactor : float D-factor of this object. Note that this needs to be given in the same units as the mass parameter. """ if tablepath is None: tablepath = os.path.join('$FERMIPY_DATA_DIR', 'gammamc_dif.dat') data = np.loadtxt(os.path.expandvars(tablepath)) # Number of decades in x = log10(E/M) ndec = 10.0 xedge = np.linspace(0, 1.0, 251) self._x = 0.5 * (xedge[1:] + xedge[:-1]) * ndec - ndec chan_code = DMFitFunction.channel_rev_map[chan] ichan = DMFitFunction.channel_index_mapping[chan_code] self._chan = chan self._chan_code = chan_code # These are the mass points self._mass = np.array([2.0, 4.0, 6.0, 8.0, 10.0, 25.0, 50.0, 80.3, 91.2, 100.0, 150.0, 176.0, 200.0, 250.0, 350.0, 500.0, 750.0, 1000.0, 1500.0, 2000.0, 3000.0, 5000.0, 7000.0, 1E4]) self._dndx = data.reshape((12, 24, 250)) self._dndx_interp = RegularGridInterpolator([self._mass, self._x], self._dndx[ichan, :, :], bounds_error=False, fill_value=None) extra_params = dict(dndx_interp=self._dndx_interp, chan=chan_code, jfactor=jfactor, dfactor=dfactor) super(DMFitFunction, self).__init__(params, 1.0, extra_params) @property def chan(self): """Return the channel string.""" return self._chan @property def chan_code(self): """Return the channel code.""" return self._chan_code @property def decay(self): """Return True if this is a decay spectrum """ return self._chan_code >= 100
[docs] @staticmethod def nparam(): return 2
[docs] @staticmethod def channels(): """ Return all available DMFit channel strings """ return DMFitFunction.channel_rev_map.keys()
@staticmethod def _eval_dnde(x, params, scale=1.0, extra_params=None): dndx_interp = extra_params.get('dndx_interp') chan = extra_params.get('chan') mass = params[1] xm = np.log10(x / mass) - 3.0 if chan >= 100: tau = params[0] dfactor = extra_params.get('dfactor') phip = 1. / (4. * np.pi) * (dfactor / ( mass*tau )) else: sigmav = params[0] jfactor = extra_params.get('jfactor') phip = 1. / (8. * np.pi) * np.power(mass, -2) * (sigmav * jfactor) #dndx = self._dndx_interp[ichan]((np.log10(mass),xm)) dndx = dndx_interp((mass, xm)) dndx[xm > 0] = 0 return phip * dndx / x
[docs] def set_channel(self, chan): if isinstance(chan, int): ichan = DMFitFunction.channel_index_mapping[chan] self._chan = DMFitFunction.channel_shortname_mapping[chan] self._chan_code = chan else: self._chan = chan self._chan_code = DMFitFunction.channel_rev_map[chan] ichan = DMFitFunction.channel_index_mapping[self._chan_code] self._dndx_interp = RegularGridInterpolator([self._mass, self._x], self._dndx[ichan, :, :], bounds_error=False, fill_value=None) self.extra_params['dndx_interp'] = self._dndx_interp self.extra_params['chan'] = self._chan_code