Source code for fermipy.residmap

import copy
import os
import numpy as np
import scipy.signal
import fermipy.config
import fermipy.defaults as defaults
import fermipy.utils as utils
from fermipy.utils import Map
from fermipy.logger import Logger
from fermipy.logger import logLevel as ll


[docs]def poisson_lnl(nc, mu): nc = np.array(nc, ndmin=1) mu = np.array(mu, ndmin=1) shape = max(nc.shape, mu.shape) lnl = np.zeros(shape) mu = mu * np.ones(shape) nc = nc * np.ones(shape) msk = nc > 0 lnl[msk] = nc[msk] * np.log(mu[msk]) - mu[msk] lnl[~msk] = -mu[~msk] return lnl
[docs]def convolve_map(m, k, cpix, threshold=0.001,imin=0,imax=None): """ Perform an energy-dependent convolution on a sequence of 2-D spatial maps. Parameters ---------- m : `~numpy.ndarray` 3-D map containing a sequence of 2-D spatial maps. First dimension should be energy. k : `~numpy.ndarray` 3-D map containing a sequence of convolution kernels (PSF) for each slice in m. This map should have the same dimension as m. cpix : list Indices of kernel reference pixel in the two spatial dimensions. threshold : float Kernel amplitude imin : int Minimum index in energy dimension. imax : int Maximum index in energy dimension. """ islice = slice(imin,imax) o = np.zeros(m[islice,...].shape) # Loop over energy for i in range(m[islice,...].shape[0]): ks = k[islice,...][i,...] ms = m[islice,...][i,...] mx = ks[cpix[0], :] > ks[cpix[0], cpix[1]] * threshold my = ks[:, cpix[1]] > ks[cpix[0], cpix[1]] * threshold nx = max(3, np.round(np.sum(mx) / 2.)) ny = max(3, np.round(np.sum(my) / 2.)) # Ensure that there is an odd number of pixels in the kernel # array if cpix[0] + nx + 1 >= ms.shape[0] or cpix[0]-nx < 0: nx -= 1 ny -= 1 sx = slice(cpix[0] - nx, cpix[0] + nx + 1) sy = slice(cpix[1] - ny, cpix[1] + ny + 1) ks = ks[sx, sy] # origin = [0, 0] # if ks.shape[0] % 2 == 0: origin[0] += 1 # if ks.shape[1] % 2 == 0: origin[1] += 1 # o[i,...] = ndimage.convolve(ms, ks, mode='constant', # origin=origin, cval=0.0) o[i,...] = scipy.signal.fftconvolve(ms, ks, mode='same') return o
[docs]def get_source_kernel(gta, name, kernel=None): """Get the PDF for the given source.""" sm = [] zs = 0 for c in gta.components: z = c.model_counts_map(name).counts.astype('float') if kernel is not None: shape = (z.shape[0],) + kernel.shape z = np.apply_over_axes(np.sum, z, axes=[1, 2]) * np.ones( shape) * kernel[np.newaxis, :, :] zs += np.sum(z) else: zs += np.sum(z) sm.append(z) sm2 = 0 for i, m in enumerate(sm): sm[i] /= zs sm2 += np.sum(sm[i] ** 2) for i, m in enumerate(sm): sm[i] /= sm2 return sm
[docs]class ResidMapGenerator(fermipy.config.Configurable): """This class generates spatial residual maps from the difference of data and model maps smoothed with a user-defined spatial/spectral template. The resulting map of source significance can be interpreted in the same way as the TS map (the likelihood of a source at the given location). The algorithm approximates the best-fit source amplitude that would be derived from a least-squares fit to the data.""" defaults = dict(defaults.residmap.items(), fileio=defaults.fileio, logging=defaults.logging) def __init__(self, config=None, **kwargs): # super(ResidMapGenerator,self).__init__(config,**kwargs) fermipy.config.Configurable.__init__(self, config, **kwargs) self.logger = Logger.get(self.__class__.__name__, self.config['fileio']['logfile'], ll(self.config['logging']['verbosity']))
[docs] def run(self, gta, prefix, **kwargs): models = kwargs.get('models', self.config['models']) if isinstance(models,dict): models = [models] o = [] for m in models: self.logger.info('Generating Residual map') self.logger.info(m) o += [self.make_residual_map(gta,prefix,copy.deepcopy(m),**kwargs)] return o
[docs] def make_residual_map(self, gta, prefix, src_dict=None, **kwargs): exclude = kwargs.get('exclude', None) erange = kwargs.get('erange', self.config['erange']) make_fits = kwargs.get('make_fits', True) if erange is not None: if len(erange) == 0: erange = [None,None] elif len(erange) == 1: erange += [None] erange[0] = (erange[0] if erange[0] is not None else gta.energies[0]) erange[1] = (erange[1] if erange[1] is not None else gta.energies[-1]) else: erange = [gta.energies[0],gta.energies[-1]] # Put the test source at the pixel closest to the ROI center xpix, ypix = (np.round((gta.npix - 1.0) / 2.), np.round((gta.npix - 1.0) / 2.)) cpix = np.array([xpix, ypix]) skywcs = gta._skywcs skydir = utils.pix_to_skydir(cpix[0], cpix[1], skywcs) if src_dict is None: src_dict = {} src_dict['ra'] = skydir.ra.deg src_dict['dec'] = skydir.dec.deg src_dict.setdefault('SpatialModel', 'PointSource') src_dict.setdefault('SpatialWidth', 0.3) src_dict.setdefault('Index', 2.0) kernel = None if src_dict['SpatialModel'] == 'Gaussian': kernel = utils.make_gaussian_kernel(src_dict['SpatialWidth'], cdelt=gta.components[0].binsz, npix=101) kernel /= np.sum(kernel) cpix = [50, 50] gta.add_source('residmap_testsource', src_dict, free=True, init_source=False,save_source_maps=False) src = gta.roi.get_source_by_name('residmap_testsource', True) modelname = utils.create_model_name(src) npix = gta.components[0].npix mmst = np.zeros((npix, npix)) cmst = np.zeros((npix, npix)) emst = np.zeros((npix, npix)) sm = get_source_kernel(gta,'residmap_testsource', kernel) ts = np.zeros((npix, npix)) sigma = np.zeros((npix, npix)) excess = np.zeros((npix, npix)) gta.delete_source('residmap_testsource') for i, c in enumerate(gta.components): imin = utils.val_to_edge(c.energies,erange[0])[0] imax = utils.val_to_edge(c.energies,erange[1])[0] mc = c.model_counts_map(exclude=exclude).counts.astype('float') cc = c.counts_map().counts.astype('float') ec = np.ones(mc.shape) ccs = convolve_map(cc, sm[i], cpix,imin=imin,imax=imax) mcs = convolve_map(mc, sm[i], cpix,imin=imin,imax=imax) ecs = convolve_map(ec, sm[i], cpix,imin=imin,imax=imax) cms = np.sum(ccs, axis=0) mms = np.sum(mcs, axis=0) ems = np.sum(ecs, axis=0) cmst += cms mmst += mms emst += ems # cts = 2.0 * (poisson_lnl(cms, cms) - poisson_lnl(cms, mms)) excess += cms - mms ts = 2.0 * (poisson_lnl(cmst, cmst) - poisson_lnl(cmst, mmst)) sigma = np.sqrt(ts) sigma[excess < 0] *= -1 sigma_map_file = utils.format_filename(self.config['fileio']['workdir'], 'residmap_sigma.fits', prefix=[prefix, modelname]) data_map_file = utils.format_filename(self.config['fileio']['workdir'], 'residmap_data.fits', prefix=[prefix, modelname]) model_map_file = utils.format_filename(self.config['fileio']['workdir'], 'residmap_model.fits', prefix=[prefix, modelname]) excess_map_file = utils.format_filename(self.config['fileio']['workdir'], 'residmap_excess.fits', prefix=[prefix, modelname]) emst /= np.max(emst) sigma_map = Map(sigma, skywcs) model_map = Map(mmst / emst, skywcs) data_map = Map(cmst / emst, skywcs) excess_map = Map(excess / emst, skywcs) o = {'name': '%s_%s' % (prefix, modelname), 'file': None, 'wcs': skywcs, 'sigma': sigma_map, 'model': model_map, 'data': data_map, 'excess': excess_map } if make_fits: fits_file = utils.format_filename(self.config['fileio']['workdir'], 'residmap.fits', prefix=[prefix,modelname]) utils.write_maps(sigma_map, {'DATA_MAP': data_map, 'MODEL_MAP': model_map, 'EXCESS_MAP': excess_map }, fits_file) o['file'] = os.path.basename(fits_file) return o