Source code for fermipy.utils

# Licensed under a 3-clause BSD style license - see LICENSE.rst
from __future__ import absolute_import, division, print_function
import os
import re
import copy
import tempfile
import functools
from collections import OrderedDict
import xml.etree.cElementTree as et
import yaml
import numpy as np
import scipy.optimize
from scipy.ndimage import map_coordinates
from scipy.interpolate import UnivariateSpline
from scipy.optimize import brentq
from scipy.ndimage.measurements import label
import scipy.special as special
from numpy.core import defchararray
from astropy.extern import six


[docs]def init_matplotlib_backend(backend=None): """This function initializes the matplotlib backend. When no DISPLAY is available the backend is automatically set to 'Agg'. Parameters ---------- backend : str matplotlib backend name. """ import matplotlib try: os.environ['DISPLAY'] except KeyError: matplotlib.use('Agg') else: if backend is not None: matplotlib.use(backend)
[docs]def unicode_representer(dumper, uni): node = yaml.ScalarNode(tag=u'tag:yaml.org,2002:str', value=uni) return node
yaml.add_representer(six.text_type, unicode_representer)
[docs]def load_yaml(infile, **kwargs): return yaml.load(open(infile), **kwargs)
[docs]def write_yaml(o, outfile, **kwargs): yaml.dump(tolist(o), open(outfile, 'w'), **kwargs)
[docs]def load_npy(infile): return np.load(infile).flat[0]
[docs]def load_data(infile, workdir=None): """Load python data structure from either a YAML or numpy file. """ infile = resolve_path(infile, workdir=workdir) infile, ext = os.path.splitext(infile) if os.path.isfile(infile + '.npy'): infile += '.npy' elif os.path.isfile(infile + '.yaml'): infile += '.yaml' else: raise Exception('Input file does not exist.') ext = os.path.splitext(infile)[1] if ext == '.npy': return infile, load_npy(infile) elif ext == '.yaml': return infile, load_yaml(infile) else: raise Exception('Unrecognized extension.')
[docs]def resolve_path(path, workdir=None): if os.path.isabs(path): return path elif workdir is None: return os.path.abspath(path) else: return os.path.join(workdir, path)
[docs]def resolve_file_path(path, **kwargs): dirs = kwargs.get('search_dirs', []) expand = kwargs.get('expand', False) if path is None: return None out_path = None if os.path.isabs(os.path.expandvars(path)) and \ os.path.isfile(os.path.expandvars(path)): out_path = path else: for d in dirs: if not os.path.isdir(os.path.expandvars(d)): continue p = os.path.join(d, path) if os.path.isfile(os.path.expandvars(p)): out_path = p break if out_path is None: raise Exception('Failed to resolve file path: %s' % path) if expand: out_path = os.path.expandvars(out_path) return out_path
[docs]def resolve_file_path_list(pathlist, workdir, prefix='', randomize=False): """Resolve the path of each file name in the file ``pathlist`` and write the updated paths to a new file. """ files = [] with open(pathlist, 'r') as f: files = [line.strip() for line in f] newfiles = [] for f in files: f = os.path.expandvars(f) if os.path.isfile(f): newfiles += [f] else: newfiles += [os.path.join(workdir, f)] if randomize: _, tmppath = tempfile.mkstemp(prefix=prefix, dir=workdir) else: tmppath = os.path.join(workdir, prefix) tmppath += '.txt' with open(tmppath, 'w') as tmpfile: tmpfile.write("\n".join(newfiles)) return tmppath
[docs]def is_fits_file(path): if (path.endswith('.fit') or path.endswith('.fits') or path.endswith('.fit.gz') or path.endswith('.fits.gz')): return True else: return False
[docs]def collect_dirs(path, max_depth=1, followlinks=True): """Recursively find directories under the given path.""" if not os.path.isdir(path): return [] o = [path] if max_depth == 0: return o for subdir in os.listdir(path): subdir = os.path.join(path, subdir) if not os.path.isdir(subdir): continue o += [subdir] if os.path.islink(subdir) and not followlinks: continue if max_depth > 0: o += collect_dirs(subdir, max_depth=max_depth - 1) return list(set(o))
[docs]def match_regex_list(patterns, string): """Perform a regex match of a string against a list of patterns. Returns true if the string matches at least one pattern in the list.""" for p in patterns: if re.findall(p, string): return True return False
[docs]def find_rows_by_string(tab, names, colnames=['assoc']): """Find the rows in a table ``tab`` that match at least one of the strings in ``names``. This method ignores whitespace and case when matching strings. Parameters ---------- tab : `astropy.table.Table` Table that will be searched. names : list List of strings. colname : str Name of the table column that will be searched for matching string. Returns ------- mask : `~numpy.ndarray` Boolean mask for rows with matching strings. """ mask = np.empty(len(tab), dtype=bool) mask.fill(False) names = [name.lower().replace(' ', '') for name in names] for colname in colnames: if colname not in tab.columns: continue col = tab[[colname]].copy() col[colname] = defchararray.replace(defchararray.lower(col[colname]).astype(str), ' ', '') for name in names: mask |= col[colname] == name return mask
[docs]def join_strings(strings, sep='_'): if strings is None: return '' else: if not isinstance(strings, list): strings = [strings] return sep.join([s for s in strings if s])
[docs]def format_filename(outdir, basename, prefix=None, extension=None): filename = join_strings(prefix) filename = join_strings([filename, basename]) if extension is not None: if extension.startswith('.'): filename += extension else: filename += '.' + extension return os.path.join(outdir, filename)
[docs]def strip_suffix(filename, suffix): for s in suffix: filename = re.sub(r'\.%s$' % s, '', filename) return filename
[docs]def met_to_mjd(time): """"Convert mission elapsed time to mean julian date.""" return 54682.65 + (time - 239557414.0) / (86400.)
RA_NGP = np.radians(192.8594812065348) DEC_NGP = np.radians(27.12825118085622) L_CP = np.radians(122.9319185680026)
[docs]def gal2eq(l, b): L_0 = L_CP - np.pi / 2. RA_0 = RA_NGP + np.pi / 2. l = np.array(l, ndmin=1) b = np.array(b, ndmin=1) l, b = np.radians(l), np.radians(b) sind = np.sin(b) * np.sin(DEC_NGP) + np.cos(b) * np.cos(DEC_NGP) * np.sin( l - L_0) dec = np.arcsin(sind) cosa = np.cos(l - L_0) * np.cos(b) / np.cos(dec) sina = (np.cos(b) * np.sin(DEC_NGP) * np.sin(l - L_0) - np.sin(b) * np.cos( DEC_NGP)) / np.cos(dec) dec = np.degrees(dec) cosa[cosa < -1.0] = -1.0 cosa[cosa > 1.0] = 1.0 ra = np.arccos(cosa) ra[np.where(sina < 0.)] = -ra[np.where(sina < 0.)] ra = np.degrees(ra + RA_0) ra = np.mod(ra, 360.) dec = np.mod(dec + 90., 180.) - 90. return ra, dec
[docs]def eq2gal(ra, dec): L_0 = L_CP - np.pi / 2. RA_0 = RA_NGP + np.pi / 2. DEC_0 = np.pi / 2. - DEC_NGP ra = np.array(ra, ndmin=1) dec = np.array(dec, ndmin=1) ra, dec = np.radians(ra), np.radians(dec) np.sinb = np.sin(dec) * np.cos(DEC_0) - np.cos(dec) * np.sin( ra - RA_0) * np.sin(DEC_0) b = np.arcsin(np.sinb) cosl = np.cos(dec) * np.cos(ra - RA_0) / np.cos(b) sinl = (np.sin(dec) * np.sin(DEC_0) + np.cos(dec) * np.sin( ra - RA_0) * np.cos(DEC_0)) / np.cos(b) b = np.degrees(b) cosl[cosl < -1.0] = -1.0 cosl[cosl > 1.0] = 1.0 l = np.arccos(cosl) l[np.where(sinl < 0.)] = - l[np.where(sinl < 0.)] l = np.degrees(l + L_0) l = np.mod(l, 360.) b = np.mod(b + 90., 180.) - 90. return l, b
[docs]def xyz_to_lonlat(*args): if len(args) == 1: x, y, z = args[0][0], args[0][1], args[0][2] else: x, y, z = args[0], args[1], args[2] lat = np.pi / 2. - np.arctan2(np.sqrt(x ** 2 + y ** 2), z) lon = np.arctan2(y, x) return lon, lat
[docs]def lonlat_to_xyz(lon, lat): phi = lon theta = np.pi / 2. - lat return np.array([np.sin(theta) * np.cos(phi), np.sin(theta) * np.sin(phi), np.cos(theta)])
[docs]def project(lon0, lat0, lon1, lat1): """This function performs a stereographic projection on the unit vector (lon1,lat1) with the pole defined at the reference unit vector (lon0,lat0).""" costh = np.cos(np.pi / 2. - lat0) cosphi = np.cos(lon0) sinth = np.sin(np.pi / 2. - lat0) sinphi = np.sin(lon0) xyz = lonlat_to_xyz(lon1, lat1) x1 = xyz[0] y1 = xyz[1] z1 = xyz[2] x1p = x1 * costh * cosphi + y1 * costh * sinphi - z1 * sinth y1p = -x1 * sinphi + y1 * cosphi z1p = x1 * sinth * cosphi + y1 * sinth * sinphi + z1 * costh r = np.arctan2(np.sqrt(x1p ** 2 + y1p ** 2), z1p) phi = np.arctan2(y1p, x1p) return r * np.cos(phi), r * np.sin(phi)
[docs]def separation_cos_angle(lon0, lat0, lon1, lat1): """Evaluate the cosine of the angular separation between two direction vectors.""" return (np.sin(lat1) * np.sin(lat0) + np.cos(lat1) * np.cos(lat0) * np.cos(lon1 - lon0))
[docs]def dot_prod(xyz0, xyz1): """Compute the dot product between two cartesian vectors where the second dimension contains the vector components.""" return np.sum(xyz0 * xyz1, axis=1)
[docs]def angle_to_cartesian(lon, lat): """Convert spherical coordinates to cartesian unit vectors.""" theta = np.array(np.pi / 2. - lat) return np.vstack((np.sin(theta) * np.cos(lon), np.sin(theta) * np.sin(lon), np.cos(theta))).T
[docs]def scale_parameter(p): if isstr(p): p = float(p) if p > 0: scale = 10 ** -np.round(np.log10(1. / p)) return p / scale, scale else: return p, 1.0
[docs]def update_bounds(val, bounds): return min(val, bounds[0]), max(val, bounds[1])
[docs]def apply_minmax_selection(val, val_minmax): if val_minmax is None: return True if val_minmax[0] is None: min_cut = True elif np.isfinite(val) and val >= val_minmax[0]: min_cut = True else: min_cut = False if val_minmax[1] is None: max_cut = True elif np.isfinite(val) and val <= val_minmax[1]: max_cut = True else: max_cut = False return (min_cut and max_cut)
[docs]def create_source_name(skydir, floor=True, prefix='PS'): hms = skydir.icrs.ra.hms dms = skydir.icrs.dec.dms if floor: ra_ms = np.floor(10. * (hms.m + hms.s / 60.)) / 10. dec_ms = np.floor(np.abs(dms.m + dms.s / 60.)) else: ra_ms = (hms.m + hms.s / 60.) dec_ms = np.abs(dms.m + dms.s / 60.) return '%s J%02.f%04.1f%+03.f%02.f' % (prefix, hms.h, ra_ms, dms.d, dec_ms)
[docs]def create_model_name(src): """Generate a name for a source object given its spatial/spectral properties. Parameters ---------- src : `~fermipy.roi_model.Source` A source object. Returns ------- name : str A source name. """ o = '' spatial_type = src['SpatialModel'].lower() o += spatial_type if spatial_type == 'gaussian': o += '_s%04.2f' % src['SpatialWidth'] if src['SpectrumType'] == 'PowerLaw': o += '_powerlaw_%04.2f' % float(src.spectral_pars['Index']['value']) else: o += '_%s' % (src['SpectrumType'].lower()) return o
[docs]def cov_to_correlation(cov): """Compute the correlation matrix given the covariance matrix. Parameters ---------- cov : `~numpy.ndarray` N x N matrix of covariances among N parameters. Returns ------- corr : `~numpy.ndarray` N x N matrix of correlations among N parameters. """ err = np.sqrt(np.diag(cov)) errinv = np.ones_like(err) * np.nan m = np.isfinite(err) & (err != 0) errinv[m] = 1. / err[m] corr = np.array(cov) return corr * np.outer(errinv, errinv)
[docs]def ellipse_to_cov(sigma_maj, sigma_min, theta): """Compute the covariance matrix in two variables x and y given the std. deviation along the semi-major and semi-minor axes and the rotation angle of the error ellipse. Parameters ---------- sigma_maj : float Std. deviation along major axis of error ellipse. sigma_min : float Std. deviation along minor axis of error ellipse. theta : float Rotation angle in radians from x-axis to ellipse major axis. """ cth = np.cos(theta) sth = np.sin(theta) covxx = cth**2 * sigma_maj**2 + sth**2 * sigma_min**2 covyy = sth**2 * sigma_maj**2 + cth**2 * sigma_min**2 covxy = cth * sth * sigma_maj**2 - cth * sth * sigma_min**2 return np.array([[covxx, covxy], [covxy, covyy]])
[docs]def twosided_cl_to_dlnl(cl): """Compute the delta-loglikehood value that corresponds to a two-sided interval of the given confidence level. Parameters ---------- cl : float Confidence level. Returns ------- dlnl : float Delta-loglikelihood value with respect to the maximum of the likelihood function. """ return 0.5 * np.power(np.sqrt(2.) * special.erfinv(cl), 2)
[docs]def twosided_dlnl_to_cl(dlnl): """Compute the confidence level that corresponds to a two-sided interval with a given change in the loglikelihood value. Parameters ---------- dlnl : float Delta-loglikelihood value with respect to the maximum of the likelihood function. Returns ------- cl : float Confidence level. """ return special.erf(dlnl**0.5)
[docs]def onesided_cl_to_dlnl(cl): """Compute the delta-loglikehood values that corresponds to an upper limit of the given confidence level. Parameters ---------- cl : float Confidence level. Returns ------- dlnl : float Delta-loglikelihood value with respect to the maximum of the likelihood function. """ alpha = 1.0 - cl return 0.5 * np.power(np.sqrt(2.) * special.erfinv(1 - 2 * alpha), 2.)
[docs]def onesided_dlnl_to_cl(dlnl): """Compute the confidence level that corresponds to an upper limit with a given change in the loglikelihood value. Parameters ---------- dlnl : float Delta-loglikelihood value with respect to the maximum of the likelihood function. Returns ------- cl : float Confidence level. """ alpha = (1.0 - special.erf(dlnl**0.5)) / 2.0 return 1.0 - alpha
[docs]def interpolate_function_min(x, y): sp = scipy.interpolate.splrep(x, y, k=2, s=0) def fn(t): return scipy.interpolate.splev(t, sp, der=1) if np.sign(fn(x[0])) == np.sign(fn(x[-1])): if np.sign(fn(x[0])) == -1: return x[-1] else: return x[0] x0 = scipy.optimize.brentq(fn, x[0], x[-1], xtol=1e-10 * np.median(x)) return x0
[docs]def find_function_root(fn, x0, xb, delta=0.0, bounds=None): """Find the root of a function: f(x)+delta in the interval encompassed by x0 and xb. Parameters ---------- fn : function Python function. x0 : float Fixed bound for the root search. This will either be used as the lower or upper bound depending on the relative value of xb. xb : float Upper or lower bound for the root search. If a root is not found in the interval [x0,xb]/[xb,x0] this value will be increased/decreased until a change in sign is found. """ if x0 == xb: return np.nan for i in range(10): if np.sign(fn(xb) + delta) != np.sign(fn(x0) + delta): break if bounds is not None and (xb < bounds[0] or xb > bounds[1]): break if xb < x0: xb *= 0.5 else: xb *= 2.0 # Failed to find a root if np.sign(fn(xb) + delta) == np.sign(fn(x0) + delta): return np.nan if x0 == 0: xtol = 1e-10 * np.abs(xb) else: xtol = 1e-10 * np.abs(xb + x0) return brentq(lambda t: fn(t) + delta, x0, xb, xtol=xtol)
[docs]def get_parameter_limits(xval, loglike, cl_limit=0.95, cl_err=0.68269, tol=1E-2, bounds=None): """Compute upper/lower limits, peak position, and 1-sigma errors from a 1-D likelihood function. This function uses the delta-loglikelihood method to evaluate parameter limits by searching for the point at which the change in the log-likelihood value with respect to the maximum equals a specific value. A cubic spline fit to the log-likelihood values is used to improve the accuracy of the calculation. Parameters ---------- xval : `~numpy.ndarray` Array of parameter values. loglike : `~numpy.ndarray` Array of log-likelihood values. cl_limit : float Confidence level to use for limit calculation. cl_err : float Confidence level to use for two-sided confidence interval calculation. tol : float Absolute precision of likelihood values. Returns ------- x0 : float Coordinate at maximum of likelihood function. err_lo : float Lower error for two-sided confidence interval with CL ``cl_err``. Corresponds to point (x < x0) at which the log-likelihood falls by a given value with respect to the maximum (0.5 for 1 sigma). Set to nan if the change in the log-likelihood function at the lower bound of the ``xval`` input array is less than than the value for the given CL. err_hi : float Upper error for two-sided confidence interval with CL ``cl_err``. Corresponds to point (x > x0) at which the log-likelihood falls by a given value with respect to the maximum (0.5 for 1 sigma). Set to nan if the change in the log-likelihood function at the upper bound of the ``xval`` input array is less than the value for the given CL. err : float Symmetric 1-sigma error. Average of ``err_lo`` and ``err_hi`` if both are defined. ll : float Lower limit evaluated at confidence level ``cl_limit``. ul : float Upper limit evaluated at confidence level ``cl_limit``. lnlmax : float Log-likelihood value at ``x0``. """ dlnl_limit = onesided_cl_to_dlnl(cl_limit) dlnl_err = twosided_cl_to_dlnl(cl_err) try: # Pad the likelihood function # if len(xval) >= 3 and np.max(loglike) - loglike[-1] < 1.5*dlnl_limit: # p = np.polyfit(xval[-3:], loglike[-3:], 2) # x = np.linspace(xval[-1], 10 * xval[-1], 3)[1:] # y = np.polyval(p, x) # x = np.concatenate((xval, x)) # y = np.concatenate((loglike, y)) # else: x, y = xval, loglike spline = UnivariateSpline(x, y, k=2, #k=min(len(xval) - 1, 3), w=(1 / tol) * np.ones(len(x))) except: print("Failed to create spline: ", xval, loglike) return {'x0': np.nan, 'ul': np.nan, 'll': np.nan, 'err_lo': np.nan, 'err_hi': np.nan, 'err': np.nan, 'lnlmax': np.nan} sd = spline.derivative() imax = np.argmax(loglike) ilo = max(imax - 1, 0) ihi = min(imax + 1, len(xval) - 1) # Find the peak x0 = xval[imax] # Refine the peak position if np.sign(sd(xval[ilo])) != np.sign(sd(xval[ihi])): x0 = find_function_root(sd, xval[ilo], xval[ihi]) lnlmax = float(spline(x0)) def fn(t): return spline(t) - lnlmax fn_val = fn(xval) if np.any(fn_val[imax:] < -dlnl_limit): xhi = xval[imax:][fn_val[imax:] < -dlnl_limit][0] else: xhi = xval[-1] # EAC: brute force check that xhi is greater than x0 # The fabs is here in case x0 is negative if xhi <= x0: xhi = x0 + np.fabs(x0) if np.any(fn_val[:imax] < -dlnl_limit): xlo = xval[:imax][fn_val[:imax] < -dlnl_limit][-1] else: xlo = xval[0] # EAC: brute force check that xlo is less than x0 # The fabs is here in case x0 is negative if xlo >= x0: xlo = x0 - 0.5*np.fabs(x0) ul = find_function_root(fn, x0, xhi, dlnl_limit, bounds=bounds) ll = find_function_root(fn, x0, xlo, dlnl_limit, bounds=bounds) err_lo = np.abs(x0 - find_function_root(fn, x0, xlo, dlnl_err, bounds=bounds)) err_hi = np.abs(x0 - find_function_root(fn, x0, xhi, dlnl_err, bounds=bounds)) err = np.nan if np.isfinite(err_lo) and np.isfinite(err_hi): err = 0.5 * (err_lo + err_hi) elif np.isfinite(err_hi): err = err_hi elif np.isfinite(err_lo): err = err_lo o = {'x0': x0, 'ul': ul, 'll': ll, 'err_lo': err_lo, 'err_hi': err_hi, 'err': err, 'lnlmax': lnlmax} return o
[docs]def poly_to_parabola(coeff): sigma = np.sqrt(1. / np.abs(2.0 * coeff[0])) x0 = -coeff[1] / (2 * coeff[0]) y0 = (1. - (coeff[1] ** 2 - 4 * coeff[0] * coeff[2])) / (4 * coeff[0]) return x0, sigma, y0
[docs]def parabola(xy, amplitude, x0, y0, sx, sy, theta): """Evaluate a 2D parabola given by: f(x,y) = f_0 - (1/2) * \delta^T * R * \Sigma * R^T * \delta where \delta = [(x - x_0), (y - y_0)] and R is the matrix for a 2D rotation by angle \theta and \Sigma is the covariance matrix: \Sigma = [[1/\sigma_x^2, 0 ], [0 , 1/\sigma_y^2]] Parameters ---------- xy : tuple Tuple containing x and y arrays for the values at which the parabola will be evaluated. amplitude : float Constant offset value. x0 : float Centroid in x coordinate. y0 : float Centroid in y coordinate. sx : float Standard deviation along first axis (x-axis when theta=0). sy : float Standard deviation along second axis (y-axis when theta=0). theta : float Rotation angle in radians. Returns ------- vals : `~numpy.ndarray` Values of the parabola evaluated at the points defined in the `xy` input tuple. """ x = xy[0] y = xy[1] cth = np.cos(theta) sth = np.sin(theta) a = (cth ** 2) / (2 * sx ** 2) + (sth ** 2) / (2 * sy ** 2) b = -(np.sin(2 * theta)) / (4 * sx ** 2) + (np.sin(2 * theta)) / ( 4 * sy ** 2) c = (sth ** 2) / (2 * sx ** 2) + (cth ** 2) / (2 * sy ** 2) vals = amplitude - (a * ((x - x0) ** 2) + 2 * b * (x - x0) * (y - y0) + c * ((y - y0) ** 2)) return vals
[docs]def get_bounded_slice(idx, dpix, shape): dpix = int(dpix) idx_lo = idx - dpix idx_hi = idx + dpix + 1 if idx_lo < 0: idx_lo = max(idx_lo, 0) idx_hi = idx_lo + (2 * dpix + 1) elif idx_hi > shape: idx_hi = min(idx_hi, shape) idx_lo = idx_hi - (2 * dpix + 1) return slice(idx_lo, idx_hi)
[docs]def get_region_mask(z, delta, xy=None): """Get mask of connected region within delta of max(z).""" if xy is None: ix, iy = np.unravel_index(np.argmax(z), z.shape) else: ix, iy = xy mz = (z > z[ix, iy] - delta) labels = label(mz)[0] mz &= labels == labels[ix, iy] return mz
[docs]def fit_parabola(z, ix, iy, dpix=3, zmin=None): """Fit a parabola to a 2D numpy array. This function will fit a parabola with the functional form described in `~fermipy.utils.parabola` to a 2D slice of the input array `z`. The fit region encompasses pixels that are within `dpix` of the pixel coordinate (iz,iy) OR that have a value relative to the peak value greater than `zmin`. Parameters ---------- z : `~numpy.ndarray` ix : int X index of center pixel of fit region in array `z`. iy : int Y index of center pixel of fit region in array `z`. dpix : int Max distance from center pixel of fit region. zmin : float """ offset = make_pixel_distance(z.shape, iy, ix) x, y = np.meshgrid(np.arange(z.shape[0]), np.arange(z.shape[1]), indexing='ij') m = (offset <= dpix) if np.sum(m) < 9: m = (offset <= dpix + 0.5) if zmin is not None: m |= get_region_mask(z, np.abs(zmin), (ix, iy)) sx = get_bounded_slice(ix, dpix, z.shape[0]) sy = get_bounded_slice(iy, dpix, z.shape[1]) coeffx = poly_to_parabola(np.polyfit(x[sx, iy], z[sx, iy], 2)) coeffy = poly_to_parabola(np.polyfit(y[ix, sy], z[ix, sy], 2)) #p0 = [coeffx[2], coeffx[0], coeffy[0], coeffx[1], coeffy[1], 0.0] p0 = [coeffx[2], float(ix), float(iy), coeffx[1], coeffy[1], 0.0] o = {'fit_success': True, 'p0': p0} def curve_fit_fn(*args): return np.ravel(parabola(*args)) try: bounds = (-np.inf * np.ones(6), np.inf * np.ones(6)) bounds[0][1] = -0.5 bounds[0][2] = -0.5 bounds[1][1] = z.shape[0] - 0.5 bounds[1][2] = z.shape[1] - 0.5 popt, pcov = scipy.optimize.curve_fit(curve_fit_fn, (np.ravel(x[m]), np.ravel(y[m])), np.ravel(z[m]), p0, bounds=bounds) except Exception: popt = copy.deepcopy(p0) o['fit_success'] = False fm = parabola((x[m], y[m]), *popt) df = fm - z[m] rchi2 = np.sum(df ** 2) / len(fm) o['rchi2'] = rchi2 o['x0'] = popt[1] o['y0'] = popt[2] o['sigmax'] = np.abs(popt[3]) o['sigmay'] = np.abs(popt[4]) o['sigma'] = np.sqrt(o['sigmax'] ** 2 + o['sigmay'] ** 2) o['z0'] = popt[0] o['theta'] = popt[5] o['popt'] = popt o['mask'] = m a = max(o['sigmax'], o['sigmay']) b = min(o['sigmax'], o['sigmay']) o['eccentricity'] = np.sqrt(1 - b ** 2 / a ** 2) o['eccentricity2'] = np.sqrt(a ** 2 / b ** 2 - 1) return o
[docs]def split_bin_edges(edges, npts=2): """Subdivide an array of bins by splitting each bin into ``npts`` subintervals. Parameters ---------- edges : `~numpy.ndarray` Bin edge array. npts : int Number of intervals into which each bin will be subdivided. Returns ------- edges : `~numpy.ndarray` Subdivided bin edge array. """ if npts < 2: return edges x = (edges[:-1, None] + (edges[1:, None] - edges[:-1, None]) * np.linspace(0.0, 1.0, npts + 1)[None, :]) return np.unique(np.ravel(x))
[docs]def center_to_edge(center): if len(center) == 1: delta = np.array(1.0, ndmin=1) else: delta = center[1:] - center[:-1] edges = 0.5 * (center[1:] + center[:-1]) edges = np.insert(edges, 0, center[0] - 0.5 * delta[0]) edges = np.append(edges, center[-1] + 0.5 * delta[-1]) return edges
[docs]def edge_to_center(edges): return 0.5 * (edges[1:] + edges[:-1])
[docs]def edge_to_width(edges): return (edges[1:] - edges[:-1])
[docs]def val_to_bin(edges, x): """Convert axis coordinate to bin index.""" ibin = np.digitize(np.array(x, ndmin=1), edges) - 1 return ibin
[docs]def val_to_pix(center, x): return np.interp(x, center, np.arange(len(center)).astype(float))
[docs]def val_to_edge(edges, x): """Convert axis coordinate to bin index.""" edges = np.array(edges) w = edges[1:] - edges[:-1] w = np.insert(w, 0, w[0]) ibin = np.digitize(np.array(x, ndmin=1), edges - 0.5 * w) - 1 ibin[ibin < 0] = 0 return ibin
[docs]def val_to_bin_bounded(edges, x): """Convert axis coordinate to bin index.""" nbins = len(edges) - 1 ibin = val_to_bin(edges, x) ibin[ibin < 0] = 0 ibin[ibin > nbins - 1] = nbins - 1 return ibin
[docs]def extend_array(edges, binsz, lo, hi): """Extend an array to encompass lo and hi values.""" numlo = int(np.ceil((edges[0] - lo) / binsz)) numhi = int(np.ceil((hi - edges[-1]) / binsz)) edges = copy.deepcopy(edges) if numlo > 0: edges_lo = np.linspace(edges[0] - numlo * binsz, edges[0], numlo + 1) edges = np.concatenate((edges_lo[:-1], edges)) if numhi > 0: edges_hi = np.linspace(edges[-1], edges[-1] + numhi * binsz, numhi + 1) edges = np.concatenate((edges, edges_hi[1:])) return edges
[docs]def mkdir(dir): if not os.path.exists(dir): os.makedirs(dir) return dir
[docs]def fits_recarray_to_dict(table): """Convert a FITS recarray to a python dictionary.""" cols = {} for icol, col in enumerate(table.columns.names): col_data = table.data[col] if type(col_data[0]) == np.float32: cols[col] = np.array(col_data, dtype=float) elif type(col_data[0]) == np.float64: cols[col] = np.array(col_data, dtype=float) elif type(col_data[0]) == str: cols[col] = np.array(col_data, dtype=str) elif type(col_data[0]) == np.string_: cols[col] = np.array(col_data, dtype=str) elif type(col_data[0]) == np.int16: cols[col] = np.array(col_data, dtype=int) elif type(col_data[0]) == np.ndarray: cols[col] = np.array(col_data) else: raise Exception( 'Unrecognized column type: %s %s' % (col, str(type(col_data)))) return cols
[docs]def unicode_to_str(args): o = {} for k, v in args.items(): if isstr(v): o[k] = str(v) else: o[k] = v return o
[docs]def isstr(s): """String instance testing method that works under both Python 2.X and 3.X. Returns true if the input is a string.""" try: return isinstance(s, basestring) except NameError: return isinstance(s, str)
[docs]def xmlpath_to_path(path): if path is None: return path return re.sub(r'\$\(([a-zA-Z\_]+)\)', r'$\1', path)
[docs]def path_to_xmlpath(path): if path is None: return path return re.sub(r'\$([a-zA-Z\_]+)', r'$(\1)', path)
[docs]def create_xml_element(root, name, attrib): el = et.SubElement(root, name) for k, v in attrib.iteritems(): if isinstance(v, bool): el.set(k, str(int(v))) elif isstr(v): el.set(k, v) elif np.isfinite(v): el.set(k, str(v)) return el
[docs]def load_xml_elements(root, path): o = {} for p in root.findall(path): if 'name' in p.attrib: o[p.attrib['name']] = copy.deepcopy(p.attrib) else: o.update(p.attrib) return o
[docs]def prettify_xml(elem): """Return a pretty-printed XML string for the Element. """ from xml.dom import minidom import xml.etree.cElementTree as et rough_string = et.tostring(elem, 'utf-8') reparsed = minidom.parseString(rough_string) return reparsed.toprettyxml(indent=" ")
[docs]def arg_to_list(arg): if arg is None: return [] elif isinstance(arg, list): return arg else: return [arg]
[docs]def update_keys(input_dict, key_map): o = {} for k, v in input_dict.items(): if k in key_map.keys(): k = key_map[k] if isinstance(v, dict): o[k] = update_keys(v, key_map) else: o[k] = v return o
[docs]def create_dict(d0, **kwargs): o = copy.deepcopy(d0) o = merge_dict(o, kwargs, add_new_keys=True) return o
[docs]def merge_dict(d0, d1, add_new_keys=False, append_arrays=False): """Recursively merge the contents of python dictionary d0 with the contents of another python dictionary, d1. Parameters ---------- d0 : dict The input dictionary. d1 : dict Dictionary to be merged with the input dictionary. add_new_keys : str Do not skip keys that only exist in d1. append_arrays : bool If an element is a numpy array set the value of that element by concatenating the two arrays. """ if d1 is None: return d0 elif d0 is None: return d1 elif d0 is None and d1 is None: return {} od = {} for k, v in d0.items(): t0 = None t1 = None if k in d0: t0 = type(d0[k]) if k in d1: t1 = type(d1[k]) if k not in d1: od[k] = copy.deepcopy(d0[k]) elif isinstance(v, dict) and isinstance(d1[k], dict): od[k] = merge_dict(d0[k], d1[k], add_new_keys, append_arrays) elif isinstance(v, list) and isstr(d1[k]): od[k] = d1[k].split(',') elif isinstance(v, dict) and d1[k] is None: od[k] = copy.deepcopy(d0[k]) elif isinstance(v, np.ndarray) and append_arrays: od[k] = np.concatenate((v, d1[k])) elif (d0[k] is not None and d1[k] is not None) and t0 != t1: if t0 == dict or t0 == list: raise Exception('Conflicting types in dictionary merge for ' 'key %s %s %s' % (k, t0, t1)) od[k] = t0(d1[k]) else: od[k] = copy.copy(d1[k]) if add_new_keys: for k, v in d1.items(): if k not in d0: od[k] = copy.deepcopy(d1[k]) return od
[docs]def merge_list_of_dicts(listofdicts): # assumes every item in list has the same keys merged = copy.deepcopy(listofdicts[0]) for k in merged.keys(): merged[k] = [] for i in xrange(len(listofdicts)): for k in merged.keys(): merged[k].append(listofdicts[i][k]) return merged
[docs]def tolist(x): """ convenience function that takes in a nested structure of lists and dictionaries and converts everything to its base objects. This is useful for dupming a file to yaml. (a) numpy arrays into python lists >>> type(tolist(np.asarray(123))) == int True >>> tolist(np.asarray([1,2,3])) == [1,2,3] True (b) numpy strings into python strings. >>> tolist([np.asarray('cat')])==['cat'] True (c) an ordered dict to a dict >>> ordered=OrderedDict(a=1, b=2) >>> type(tolist(ordered)) == dict True (d) converts unicode to regular strings >>> type(u'a') == str False >>> type(tolist(u'a')) == str True (e) converts numbers & bools in strings to real represntation, (i.e. '123' -> 123) >>> type(tolist(np.asarray('123'))) == int True >>> type(tolist('123')) == int True >>> tolist('False') == False True """ if isinstance(x, list): return map(tolist, x) elif isinstance(x, dict): return dict((tolist(k), tolist(v)) for k, v in x.items()) elif isinstance(x, np.ndarray) or isinstance(x, np.number): # note, call tolist again to convert strings of numbers to numbers return tolist(x.tolist()) elif isinstance(x, OrderedDict): return dict(x) elif isinstance(x, np.bool_): return bool(x) elif isstr(x) or isinstance(x, np.str): x = str(x) # convert unicode & numpy strings try: return int(x) except: try: return float(x) except: if x == 'True': return True elif x == 'False': return False else: return x else: return x
[docs]def create_hpx_disk_region_string(skyDir, coordsys, radius, inclusive=0): """ """ # Make an all-sky region if radius >= 90.: return None if coordsys == "GAL": xref = skyDir.galactic.l.deg yref = skyDir.galactic.b.deg elif coordsys == "CEL": xref = skyDir.ra.deg yref = skyDir.dec.deg else: raise Exception("Unrecognized coordinate system %s" % coordsys) if inclusive: val = "DISK_INC(%.3f,%.3f,%.3f,%i)" % (xref, yref, radius, inclusive) else: val = "DISK(%.3f,%.3f,%.3f)" % (xref, yref, radius) return val
[docs]def convolve2d_disk(fn, r, sig, nstep=200): """Evaluate the convolution f'(r) = f(r) * g(r) where f(r) is azimuthally symmetric function in two dimensions and g is a step function given by: g(r) = H(1-r/s) Parameters ---------- fn : function Input function that takes a single radial coordinate parameter. r : `~numpy.ndarray` Array of points at which the convolution is to be evaluated. sig : float Radius parameter of the step function. nstep : int Number of sampling point for numeric integration. """ r = np.array(r, ndmin=1) sig = np.array(sig, ndmin=1) rmin = r - sig rmax = r + sig rmin[rmin < 0] = 0 delta = (rmax - rmin) / nstep redge = rmin[..., np.newaxis] + \ delta[..., np.newaxis] * np.linspace(0, nstep, nstep + 1) rp = 0.5 * (redge[..., 1:] + redge[..., :-1]) dr = redge[..., 1:] - redge[..., :-1] fnv = fn(rp) r = r.reshape(r.shape + (1,)) cphi = -np.ones(dr.shape) m = ((rp + r) / sig < 1) | (r == 0) rrp = r * rp sx = r ** 2 + rp ** 2 - sig ** 2 cphi[~m] = sx[~m] / (2 * rrp[~m]) dphi = 2 * np.arccos(cphi) v = rp * fnv * dphi * dr / (np.pi * sig * sig) s = np.sum(v, axis=-1) return s
[docs]def convolve2d_gauss(fn, r, sig, nstep=200): """Evaluate the convolution f'(r) = f(r) * g(r) where f(r) is azimuthally symmetric function in two dimensions and g is a 2D gaussian with standard deviation s given by: g(r) = 1/(2*pi*s^2) Exp[-r^2/(2*s^2)] Parameters ---------- fn : function Input function that takes a single radial coordinate parameter. r : `~numpy.ndarray` Array of points at which the convolution is to be evaluated. sig : float Width parameter of the gaussian. nstep : int Number of sampling point for numeric integration. """ r = np.array(r, ndmin=1) sig = np.array(sig, ndmin=1) rmin = r - 10 * sig rmax = r + 10 * sig rmin[rmin < 0] = 0 delta = (rmax - rmin) / nstep redge = (rmin[..., np.newaxis] + delta[..., np.newaxis] * np.linspace(0, nstep, nstep + 1)) rp = 0.5 * (redge[..., 1:] + redge[..., :-1]) dr = redge[..., 1:] - redge[..., :-1] fnv = fn(rp) r = r.reshape(r.shape + (1,)) sig2 = sig * sig x = r * rp / (sig2) if 'je_fn' not in convolve2d_gauss.__dict__: t = 10 ** np.linspace(-8, 8, 1000) t = np.insert(t, 0, [0]) je = special.ive(0, t) convolve2d_gauss.je_fn = UnivariateSpline(t, je, k=2, s=0) je = convolve2d_gauss.je_fn(x.flat).reshape(x.shape) #je2 = special.ive(0,x) v = (rp * fnv / (sig2) * je * np.exp(x - (r * r + rp * rp) / (2 * sig2)) * dr) s = np.sum(v, axis=-1) return s
[docs]def make_pixel_distance(shape, xpix=None, ypix=None): """Fill a 2D array with dimensions `shape` with the distance of each pixel from a reference direction (xpix,ypix) in pixel coordinates. Pixel coordinates are defined such that (0,0) is located at the center of the corner pixel. """ if np.isscalar(shape): shape = [shape, shape] if xpix is None: xpix = (shape[1] - 1.0) / 2. if ypix is None: ypix = (shape[0] - 1.0) / 2. dx = np.linspace(0, shape[1] - 1, shape[1]) - xpix dy = np.linspace(0, shape[0] - 1, shape[0]) - ypix dxy = np.zeros(shape) dxy += np.sqrt(dx[np.newaxis, :] ** 2 + dy[:, np.newaxis] ** 2) return dxy
[docs]def make_gaussian_kernel(sigma, npix=501, cdelt=0.01, xpix=None, ypix=None): """Make kernel for a 2D gaussian. Parameters ---------- sigma : float Standard deviation in degrees. """ sigma /= cdelt def fn(t, s): return 1. / (2 * np.pi * s ** 2) * np.exp( -t ** 2 / (s ** 2 * 2.0)) dxy = make_pixel_distance(npix, xpix, ypix) k = fn(dxy, sigma) k /= (np.sum(k) * np.radians(cdelt) ** 2) return k
[docs]def make_disk_kernel(radius, npix=501, cdelt=0.01, xpix=None, ypix=None): """Make kernel for a 2D disk. Parameters ---------- radius : float Disk radius in deg. """ radius /= cdelt def fn(t, s): return 0.5 * (np.sign(s - t) + 1.0) dxy = make_pixel_distance(npix, xpix, ypix) k = fn(dxy, radius) k /= (np.sum(k) * np.radians(cdelt) ** 2) return k
[docs]def make_cdisk_kernel(psf, sigma, npix, cdelt, xpix, ypix, psf_scale_fn=None, normalize=False): """Make a kernel for a PSF-convolved 2D disk. Parameters ---------- psf : `~fermipy.irfs.PSFModel` sigma : float 68% containment radius in degrees. """ sigma /= 0.8246211251235321 dtheta = psf.dtheta egy = psf.energies x = make_pixel_distance(npix, xpix, ypix) x *= cdelt k = np.zeros((len(egy), npix, npix)) for i in range(len(egy)): def fn(t): return psf.eval(i, t, scale_fn=psf_scale_fn) psfc = convolve2d_disk(fn, dtheta, sigma) k[i] = np.interp(np.ravel(x), dtheta, psfc).reshape(x.shape) if normalize: k /= (np.sum(k, axis=0)[np.newaxis, ...] * np.radians(cdelt) ** 2) return k
[docs]def make_cgauss_kernel(psf, sigma, npix, cdelt, xpix, ypix, psf_scale_fn=None, normalize=False): """Make a kernel for a PSF-convolved 2D gaussian. Parameters ---------- psf : `~fermipy.irfs.PSFModel` sigma : float 68% containment radius in degrees. """ sigma /= 1.5095921854516636 dtheta = psf.dtheta egy = psf.energies x = make_pixel_distance(npix, xpix, ypix) x *= cdelt k = np.zeros((len(egy), npix, npix)) for i in range(len(egy)): def fn(t): return psf.eval(i, t, scale_fn=psf_scale_fn) psfc = convolve2d_gauss(fn, dtheta, sigma) k[i] = np.interp(np.ravel(x), dtheta, psfc).reshape(x.shape) if normalize: k /= (np.sum(k, axis=0)[np.newaxis, ...] * np.radians(cdelt) ** 2) return k
[docs]def memoize(obj): obj.cache = {} @functools.wraps(obj) def memoizer(*args, **kwargs): key = str(args) + str(kwargs) if key not in obj.cache: obj.cache = {} obj.cache[key] = obj(*args, **kwargs) return obj.cache[key] return memoizer
[docs]def make_radial_kernel(psf, fn, sigma, npix, cdelt, xpix, ypix, psf_scale_fn=None, normalize=False, klims=None, sparse=False): """Make a kernel for a general radially symmetric 2D function. Parameters ---------- psf : `~fermipy.irfs.PSFModel` fn : callable Function that evaluates the kernel at a radial coordinate r. sigma : float 68% containment radius in degrees. """ if klims is None: egy = psf.energies else: egy = psf.energies[klims[0]:klims[1] + 1] ang_dist = make_pixel_distance(npix, xpix, ypix) * cdelt max_ang_dist = np.max(ang_dist) + cdelt #dtheta = np.linspace(0.0, (np.max(ang_dist) * 1.05)**0.5, 200)**2.0 # z = create_kernel_function_lookup(psf, fn, sigma, egy, # dtheta, psf_scale_fn) shape = (len(egy), npix, npix) k = np.zeros(shape) r99 = psf.containment_angle(energies=egy, fraction=0.997) r34 = psf.containment_angle(energies=egy, fraction=0.34) rmin = np.maximum(r34 / 4., 0.01) rmax = np.maximum(r99, 0.1) if sigma is not None: rmin = np.maximum(rmin, 0.5 * sigma) rmax = np.maximum(rmax, 2.0 * r34 + 3.0 * sigma) rmax = np.minimum(rmax, max_ang_dist) for i in range(len(egy)): rebin = min(int(np.ceil(cdelt / rmin[i])), 8) if sparse: dtheta = np.linspace(0.0, rmax[i]**0.5, 100)**2.0 else: dtheta = np.linspace(0.0, max_ang_dist**0.5, 200)**2.0 z = eval_radial_kernel(psf, fn, sigma, i, dtheta, psf_scale_fn) xdist = make_pixel_distance(npix * rebin, xpix * rebin + (rebin - 1.0) / 2., ypix * rebin + (rebin - 1.0) / 2.) xdist *= cdelt / float(rebin) #x = val_to_pix(dtheta, np.ravel(xdist)) if sparse: m = np.ravel(xdist) < rmax[i] kk = np.zeros(xdist.size) #kk[m] = map_coordinates(z, [x[m]], order=2, prefilter=False) kk[m] = np.interp(np.ravel(xdist)[m], dtheta, z) kk = kk.reshape(xdist.shape) else: kk = np.interp(np.ravel(xdist), dtheta, z).reshape(xdist.shape) # kk = map_coordinates(z, [x], order=2, # prefilter=False).reshape(xdist.shape) if rebin > 1: kk = sum_bins(kk, 0, rebin) kk = sum_bins(kk, 1, rebin) k[i] = kk / float(rebin)**2 k = k.reshape((len(egy),) + ang_dist.shape) if normalize: k /= (np.sum(k, axis=0)[np.newaxis, ...] * np.radians(cdelt) ** 2) return k
[docs]def eval_radial_kernel(psf, fn, sigma, idx, dtheta, psf_scale_fn): if fn is None: return psf.eval(idx, dtheta, scale_fn=psf_scale_fn) else: return fn(lambda t: psf.eval(idx, t, scale_fn=psf_scale_fn), dtheta, sigma)
#@memoize
[docs]def create_kernel_function_lookup(psf, fn, sigma, egy, dtheta, psf_scale_fn): z = np.zeros((len(egy), len(dtheta))) for i in range(len(egy)): if fn is None: z[i] = psf.eval(i, dtheta, scale_fn=psf_scale_fn) else: z[i] = fn(lambda t: psf.eval(i, t, scale_fn=psf_scale_fn), dtheta, sigma) return z
[docs]def create_radial_spline(psf, fn, sigma, egy, dtheta, psf_scale_fn): from scipy.ndimage.interpolation import spline_filter z = create_kernel_function_lookup( psf, fn, sigma, egy, dtheta, psf_scale_fn) sp = [] for i in range(z.shape[0]): sp += [spline_filter(z[i], order=2)] return sp
[docs]def make_psf_kernel(psf, npix, cdelt, xpix, ypix, psf_scale_fn=None, normalize=False): """ Generate a kernel for a point-source. Parameters ---------- psf : `~fermipy.irfs.PSFModel` npix : int Number of pixels in X and Y dimensions. cdelt : float Pixel size in degrees. """ egy = psf.energies x = make_pixel_distance(npix, xpix, ypix) x *= cdelt k = np.zeros((len(egy), npix, npix)) for i in range(len(egy)): k[i] = psf.eval(i, x, scale_fn=psf_scale_fn) if normalize: k /= (np.sum(k, axis=0)[np.newaxis, ...] * np.radians(cdelt) ** 2) return k
[docs]def rebin_map(k, nebin, npix, rebin): if rebin > 1: k = np.sum(k.reshape((nebin, npix * rebin, npix, rebin)), axis=3) k = k.swapaxes(1, 2) k = np.sum(k.reshape(nebin, npix, npix, rebin), axis=3) k = k.swapaxes(1, 2) k /= rebin ** 2 return k
[docs]def sum_bins(x, dim, npts): if npts <= 1: return x shape = x.shape[:dim] + (int(x.shape[dim] / npts), npts) + x.shape[dim + 1:] return np.sum(x.reshape(shape), axis=dim + 1)
[docs]def overlap_slices(large_array_shape, small_array_shape, position): """ Modified version of `~astropy.nddata.utils.overlap_slices`. Get slices for the overlapping part of a small and a large array. Given a certain position of the center of the small array, with respect to the large array, tuples of slices are returned which can be used to extract, add or subtract the small array at the given position. This function takes care of the correct behavior at the boundaries, where the small array is cut of appropriately. Parameters ---------- large_array_shape : tuple Shape of the large array. small_array_shape : tuple Shape of the small array. position : tuple Position of the small array's center, with respect to the large array. Coordinates should be in the same order as the array shape. Returns ------- slices_large : tuple of slices Slices in all directions for the large array, such that ``large_array[slices_large]`` extracts the region of the large array that overlaps with the small array. slices_small : slice Slices in all directions for the small array, such that ``small_array[slices_small]`` extracts the region that is inside the large array. """ # Get edge coordinates edges_min = [int(pos - small_shape // 2) for (pos, small_shape) in zip(position, small_array_shape)] edges_max = [int(pos + (small_shape - small_shape // 2)) for (pos, small_shape) in zip(position, small_array_shape)] # Set up slices slices_large = tuple(slice(max(0, edge_min), min(large_shape, edge_max)) for (edge_min, edge_max, large_shape) in zip(edges_min, edges_max, large_array_shape)) slices_small = tuple(slice(max(0, -edge_min), min(large_shape - edge_min, edge_max - edge_min)) for (edge_min, edge_max, large_shape) in zip(edges_min, edges_max, large_array_shape)) return slices_large, slices_small