Source code for fermipy.spectrum

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function
import copy
import numpy as np


[docs]def cast_args(x): if isinstance(x, np.ndarray) and x.ndim >= 2: return x return np.expand_dims(np.array(x, ndmin=1), -1)
[docs]def cast_params(params): if isinstance(params[0], np.ndarray) and params[0].ndim >= 2: return list(params) o = [] for i, p in enumerate(params): o += [np.expand_dims(np.array(params[i], ndmin=1), 0)] return o
[docs]class SEDFunctor(object): """Functor that accepts a model parameter vector and computes the normalization of a spectral model in a sequence of SED energy bins. """ def __init__(self, spectrum, scale, emin, emax): self._emin = emin self._emax = emax self._scale = scale self._spectrum = spectrum @property def emin(self): return self._emin @property def emax(self): return self._emax @property def scale(self): return self._scale
[docs]class SEDFluxFunctor(SEDFunctor): """Functor that computes the flux of a source in a sequence of SED energy bins.""" def __init__(self, spectrum, scale, emin, emax): super(SEDFluxFunctor, self).__init__(spectrum, scale, emin, emax) def __call__(self, params): params = cast_params(params) emin = np.expand_dims(self.emin, 1) emax = np.expand_dims(self.emax, 1) return np.squeeze(self._spectrum.eval_flux(emin, emax, params, self.scale))
[docs]class SEDEFluxFunctor(SEDFunctor): """Functor that computes the energy flux of a source in a sequence of SED energy bins.""" def __init__(self, spectrum, scale, emin, emax): super(SEDEFluxFunctor, self).__init__(spectrum, scale, emin, emax) def __call__(self, params): params = cast_params(params) emin = np.expand_dims(self.emin, 1) emax = np.expand_dims(self.emax, 1) return np.squeeze(self._spectrum.eval_eflux(emin, emax, params, self.scale))
[docs]class SpectralFunction(object): """Base class for spectral model classes.""" def __init__(self, params, scale=1.0): self._params = params self._scale = scale @property def params(self): return self._params @property def log_params(self): return self.params_to_log(self._params) @property def scale(self): return self._scale @staticmethod
[docs] def create_functor(spec_type, func_type, emin, emax, scale=1.0): if func_type.lower() == 'flux': return eval(spec_type).create_flux_functor(emin, emax, scale) elif func_type.lower() == 'eflux': return eval(spec_type).create_eflux_functor(emin, emax, scale) else: raise Exception("Did not recognize func_type: %s" % func_type)
@classmethod
[docs] def create_flux_functor(cls, emin, emax, escale=1.0): return SEDFluxFunctor(cls, escale, emin, emax)
@classmethod
[docs] def create_eflux_functor(cls, emin, emax, escale=1.0): return SEDEFluxFunctor(cls, escale, emin, emax)
@classmethod
[docs] def eval_e2dfde(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) return cls._eval_dfde(x, params, scale) * x**2
@classmethod
[docs] def eval_edfde(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) return cls._eval_dfde(x, params, scale) * x
@classmethod
[docs] def eval_dfde(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) return cls._eval_dfde(x, params, scale)
@classmethod
[docs] def eval_dfde_deriv(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) return cls._eval_dfde_deriv(x, params, scale)
@classmethod
[docs] def eval_edfde_deriv(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) dfde_deriv = cls._eval_dfde_deriv(x, params, scale) dfde = cls._eval_dfde(x, params, scale) return x*dfde_deriv + dfde
@classmethod
[docs] def eval_e2dfde_deriv(cls, x, params, scale=1.0): x = cast_args(x) params = cast_params(params) dfde_deriv = cls._eval_dfde_deriv(x, params, scale) dfde = cls._eval_dfde(x, params, scale) return x**2*dfde_deriv + 2*x*dfde
@classmethod def _integrate(cls, fn, emin, emax, params, scale=1.0, npt=20): """Fast numerical integration method using mid-point rule.""" emin = np.expand_dims(emin, -1) emax = np.expand_dims(emax, -1) params = copy.deepcopy(params) for i, p in enumerate(params): params[i] = np.expand_dims(params[i], -1) xedges = np.linspace(0.0, 1.0, npt + 1) logx_edge = np.log(emin) + xedges * (np.log(emax) - np.log(emin)) logx = 0.5 * (logx_edge[..., 1:] + logx_edge[..., :-1]) xw = np.exp(logx_edge[..., 1:]) - np.exp(logx_edge[..., :-1]) dfde = fn(np.exp(logx), params, scale) return np.sum(dfde * xw, axis=-1) @classmethod def _eval_dfde_deriv(cls, x, params, scale=1.0, eps=1E-6): return (cls._eval_dfde(x+eps, params, scale) - cls._eval_dfde(x, params, scale))/eps @classmethod
[docs] def eval_flux(cls, emin, emax, params, scale=1.0): emin = cast_args(emin) emax = cast_args(emax) params = cast_params(params) return cls._integrate(cls.eval_dfde, emin, emax, params, scale)
@classmethod
[docs] def eval_eflux(cls, emin, emax, params, scale=1.0): emin = cast_args(emin) emax = cast_args(emax) params = cast_params(params) return cls._integrate(cls.eval_edfde, emin, emax, params, scale)
[docs] def dfde(self, x, params=None): """Evaluate differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_dfde(x, params, self.scale))
[docs] def edfde(self, x, params=None): """Evaluate E times differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_edfde(x, params, self.scale))
[docs] def e2dfde(self, x, params=None): """Evaluate E^2 times differential flux.""" params = self.params if params is None else params return np.squeeze(self.eval_e2dfde(x, params, self.scale))
[docs] def dfde_deriv(self, x, params=None): """Evaluate derivative of the differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_dfde_deriv(x, params, self.scale))
[docs] def edfde_deriv(self, x, params=None): """Evaluate derivative of E times differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_edfde_deriv(x, params, self.scale))
[docs] def e2dfde_deriv(self, x, params=None): """Evaluate derivative of E^2 times differential flux with respect to E.""" params = self.params if params is None else params return np.squeeze(self.eval_e2dfde_deriv(x, params, self.scale))
[docs] def flux(self, emin, emax, params=None): """Evaluate the integral flux.""" params = self.params if params is None else params return np.squeeze(self.eval_flux(emin, emax, params, self.scale))
[docs] def eflux(self, emin, emax, params=None): """Evaluate the energy flux flux.""" params = self.params if params is None else params return np.squeeze(self.eval_eflux(emin, emax, params, self.scale))
[docs]class PowerLaw(SpectralFunction): """Class that evaluates a power-law function with the parameterization: F(x) = p_0 * (x/x_s)^p_1 where x_s is the scale parameter. The `params ` array should be defined with: * params[0] : Prefactor (p_0) * params[1] : Index (p_1) """ def __init__(self, params, scale=1.0): super(PowerLaw, self).__init__(params, scale) @staticmethod def _eval_dfde(x, params, scale=1.0): return params[0] * (x / scale) ** params[1] @classmethod
[docs] def eval_flux(cls, emin, emax, params, scale=1.0): phi0 = np.array(params[0], ndmin=1) index = np.array(params[1], ndmin=1) x0 = scale index1 = index + 1 m = np.isclose(index1, 0.0) iindex0 = np.zeros(index.shape) iindex1 = np.zeros(index.shape) iindex1[~m] = 1. / index1[~m] iindex0[m] = 1. v0 = phi0 * x0 ** (-index) * (np.log(emax) - np.log(emin)) * iindex0 v1 = phi0 * x0 ** (-index) * (emax ** index1 - emin ** index1) * iindex1 return v0 + v1
# y0 = x0 * phi0 * (emin / x0) ** (index + 1) / (index + 1) # y1 = x0 * phi0 * (emax / x0) ** (index + 1) / (index + 1) # v2 = y1 - y0 # return v2 @classmethod
[docs] def eval_eflux(cls, emin, emax, params, scale=1.0): params = copy.deepcopy(params) params[1] += 1.0 return cls.eval_flux(emin, emax, params, scale) * scale
@staticmethod
[docs] def eval_norm(scale, index, emin, emax, flux): return flux / PowerLaw.eval_flux(emin, emax, [1.0, index], scale)
[docs]class LogParabola(SpectralFunction): def __init__(self, params, scale=1.0): super(LogParabola, self).__init__(params, scale) @staticmethod def _eval_dfde(x, params, scale=1.0): return (params[0] * (x / scale) ** (params[1] - params[2] * np.log(x / scale)))
[docs]class PLExpCutoff(SpectralFunction): def __init__(self, params, scale=1.0): super(PLExpCutoff, self).__init__(params, scale) @staticmethod
[docs] def params_to_log(params): return [np.log10(params[0]), params[1], np.log10(params[2])]
@staticmethod
[docs] def log_to_params(params): return [10**params[0], params[1], 10**params[2]]
@staticmethod def _eval_dfde(x, params, scale=1.0): return params[0] * (x / scale) ** (params[1]) * np.exp(-x / params[2]) @classmethod def _eval_dfde_deriv(cls, x, params, scale=1.0): return (cls._eval_dfde(x, params, scale) * (params[1]*params[2] - x)/(params[2]*x))