Source code for fiducia.error

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tues June 16 13:48:21 2020

Utilities for calculating response uncertainty

@author: Myles Brophy
"""

# python modules
import numpy as np
import xarray as xr
import pandas as pd
import matplotlib.pyplot as plt
import datetime
#custom modules
from fiducia.cspline import splineCoordsInv, segmentsArr, dToyArr, yChiCoeffArrEnergies, responseInterp, detectorArr
from fiducia.misc import areDataFramesCompatible
from fiducia.stats import trapzVariance, gradientVariance, interpVariance
from fiducia.response import knotFind
from fiducia.loader import loadResponses, loadResponseUncertainty
# listing all functions declared in this file so that sphinx-automodapi
# correctly documents them and doesn't document imported functions.
__all__ = ["detectorErrMC",
           "knotVarianceFind",
           "responseInterpVariance",
           "fancyTrapz2Variance",
           "detectorArrVariance",
           "detectorUncertainty"
           ]



    
[docs]def detectorErrMC(detArr, detArrVariance, samples=10000, boundary="y0", MChistogram=False): r""" Monte Carlo simulation and statistics to determine cubic spline uncertainty. Calculate the cubic spline matrix uncertainty using a Monte Carlo simulation and statistics on the MC's output. Parameters ---------- detArr: numpy.ndarray Matrix representing the spectrally integrated folding of the detector response with a cubic spline interpolation of the x-ray spectrum. See 'cspline.detectorArr()'. detArrVariance: numpy.ndarray A DataFrame containing the uncertainty for each Dante channel for the photon energy range that detArr spans. samples : int, optional Number of MCs amples to run. Default is `10000`. boundary : str, optional Choose whether yGuess corresponds to :math:`y_0` (lowest photon energy) or :math:`y_{n+1}` (highest photon energy) boundary condition. This should correspond to the photon energy value given in knots. Options are `y0` or `yn+1`. Default is 'y0'. MChistogram : bool, optional Plot histograms corresponding to each variant of detArr generated with Monte Carlo uncertainty propagation. Default is `False`. Returns ------- stdErrorMatrix : numpy.ndarray A numpy.ndarray with the standard deviation of the inverted matrices generated using random weights based on the channel uncertainty. Raises ------ Exception If `boundary` doesn't equal `y0` or `yn+1`. ValueError If the shapes of `detArr` and `detUncertaintyArr` aren't equal. Notes ----- See also -------- Examples -------- """ if detArr.shape != detArrVariance.shape: raise ValueError(f"Shape of the cubic spline matrix ({detArr.shape}) is not equal to"+ f" the shape of the detector uncertainty matrix ({detArrVariance.shape})") #square root the uncertainty matrix because it is sigma^2 and we want sigma detArrUncertainty = np.sqrt(detArrVariance) #make a 3d matrix, where each 2d slice is an error matrix generated by MC print("Generating "+str(samples) + " MC samples...") error = xr.DataArray(np.zeros((samples,)+detArr.shape), dims=['sample', 'channel', 'knot_point']) for s in range(samples): error[s] = detArr + np.random.normal(loc=0, scale=1, size=detArr.shape)*detArrUncertainty print("Finished generating MC samples.") detArrInv = xr.DataArray(np.linalg.inv(detArr), dims=['channel', 'knot_point'], attrs={'boundary':boundary}) invError = xr.DataArray(np.zeros(error.shape), dims=['sample', 'channel', 'knot_point']) # #invert each 2d slice of the error matrix for sample in range(samples): invErrSlice = np.linalg.inv(error[sample]) invError[sample] = invErrSlice # #find std of each element compared to other samples stdDetArrInv = invError.std(dim="sample") stdDetArrInv.attrs['boundary'] = boundary if MChistogram: for ch in invError['channel']: for knot in invError['knot_point']: # print(len(invError.sel(channel=ch, knot_point=knot).values)) plt.hist(invError.sel(channel=ch, knot_point=knot).values, samples, alpha=1, label=f'Channel {ch.values} knot {knot.values}') plt.axvline(detArrInv.sel(channel=ch, knot_point=knot).values, label='DetArrInv', color='red') plt.axvline(detArrInv.sel(channel=ch,knot_point=knot).values + stdDetArrInv.sel(channel=ch, knot_point=knot).values, label='DetArrInv+sigma', color='purple') plt.axvline(detArrInv.sel(channel=ch, knot_point=knot).values - stdDetArrInv.sel(channel=ch, knot_point=knot).values, label='DetArrInv-sigma', color='green') plt.legend(loc='upper right') plt.title("Detector MC") plt.show() return detArrInv, stdDetArrInv
#TODO check to see if this has any effect on final error bars. Pawel thinks not.
[docs]def knotVarianceFind(channels, responseUncertaintyFrame=None, forceKnot=np.array([]), knotBoundaryY=1e-77, boundary='y0'): r""" Modification of response.knotFind() Parameters ---------- channels: numpy.ndarray Array of DANTE channel numbers. responseUncertaintyFrame: pandas.core.frame.DataFrame, optional DataFrame holding percent uncertainties of DANTE channel responses as a function of photon energy (not normalized). The default is `None`. forceKnot : TYPE, optional DESCRIPTION. The default is `np.array([])`. knotBoundaryY : float, optional Guess for position of y_0 or y_{n+1} knot point. Default is 1e-77. boundary: str, optional Choose whether yGuess corresponds to :math:`y_0` (lowest photon energy) or :math:`y_{n+1}` (highest photon energy) boundary condition. This should correspond to the photon energy value given in knots. Options are `y0` or `yn+1`. Default `y0`. Returns ------- knotUncertainty: numpy.ndarray An array of uncertainty in knot points, with each element corresponding to a channel or boundary condition. See :func:`response.knotFind`. Notes ----- See also -------- Examples -------- """ #modified from response.knotFind knotsUncertainty = np.zeros(len(channels)) #check if uncertainty frame is passed. if responseUncertaintyFrame is None: #return ndarray of length channels + 1 for boundary condition return np.zeros(len(channels)+1) for idx, channel in enumerate(channels): # if forceKnot isn't an empty list, then we go about forcing the user # provided values. if forceKnot.size != 0: if channel in forceKnot[:,0]: # if the channel is forced by the user then put the user provided # photon energy into the knots array forceIdx = np.where(forceKnot[:,0] == channel) knotsUncertainty[idx] = forceKnot[forceIdx, 1] else: # finding largest negative gradient, which should correpond to the # K-edge of the DANTE channel filter. grad = -gradientVariance(responseUncertaintyFrame[channel]) maxIndex = np.argmax(grad) knotsUncertainty[idx] = responseUncertaintyFrame['Energy(eV)'][maxIndex] else: # finding largest negative gradient, which should correpond to the # K-edge of the DANTE channel filter. grad = -gradientVariance(responseUncertaintyFrame[channel]) maxIndex = np.argmax(grad) knotsUncertainty[idx] = responseUncertaintyFrame['Energy(eV)'][maxIndex] if boundary == 'y0': knotsUncertaintyAppend = np.append([knotBoundaryY], knotsUncertainty) elif boundary == 'yn+1': knotsUncertaintyAppend = np.append(knotsUncertainty, [knotBoundaryY]) else: raise Exception(f"No method found for boundary {boundary}.") return knotsUncertaintyAppend
[docs]def responseInterpVariance(energyNorm, energyMin, energyMax, responseUncertaintyFrame, channels): r""" Given a DANTE detector response as a function of energy, convert the response to normalized photon energy, t, over a given spline segment, and return interpolated response values for a given value of t. Returns an array of interpolated responses corresponding to the number of channels. Parameters ---------- energyNorm: float, numpy.ndarray normalized photon energy energyMin: float Lower bound photon energy of the spline segment over which we are normalizing. eneryMax: float Upper bound photon energy of the spline segment over which we are normalizing. responseFrame: pandas.core.frame.DataFrame DANTE channel responses as a function of photon energy (not normalized). channels: numpy.ndarray numpy array of DANTE channel numbers. Returns ------- responsesInterpdVariance: numpy.ndarray Returns a matrix of (energyNorms, channels) of response functions. Notes ----- See also -------- cspline.repsonseInterp Examples -------- """ chLen = len(channels) if np.shape(energyNorm): # if energyNorm is a vector, then get the length of that vector energyLen = len(energyNorm) else: # otherwise the length is just 1 energyLen = 1 # construct an array of interpolated response functions based on # number of photon energy points we want and number of DANTE channels responsesInterpdVariance = np.zeros((energyLen, chLen)) # fetch the original array of energy values (not normalized) energyArr = responseUncertaintyFrame['Energy(eV)'] # converting normalized energy values to un-normalized energy values, # since our response functions are in terms of absolute photon energy energyReg = splineCoordsInv(energyNorm, energyMin, energyMax) for idx, channel in enumerate(channels): responseUncertaintyArr = responseUncertaintyFrame[channel] responsesInterpdVariance[:,idx] = interpVariance(x=energyReg, xp=energyArr, fpUnc=responseUncertaintyArr) return responsesInterpdVariance
[docs]def fancyTrapz2Variance(energyNorms, yChis, segments, responseUncertaintyFrame, channels, interpProp=True): r""" Calculate the variance when propogating uncertainties through :func:`fiducia.cspline.fancyTrapz2`. Parameters ---------- energyNorms : numpy.ndarray Array of normalized energies over which the integral is computed. yChis : numpy.ndarray 3D array corresponding to the :math:`M_{y \chi}` coefficients. Array shape corresponds to (`energyNorms`, `chLen`, `dToY`). See :func:`fiducia.error.detectorArrVariance` segments : numpy.ndarray Array of segments produced by :func:`segmentsArr` with the knots responseUncertaintyFrame : pandas.core.frame.DataFrame DataFrame holding uncertainty percentages of DANTE channel responses as a function of photon energy (not normalized). channels : numpy.ndarray Array of DANTE channel numbers. interpProp : bool, optional Boolean to decide if :func:`error.responseInterpVariance` should be used. If `False, :func:`cspline.responseInterp()` is used, speeding up the calculation. Note that the uncertainty is would not be propagated correctly if `False`. With future optimizations, this option to choose may be removed. Default is `True`. Returns ------- integArrVariance : xarray.Dat A matrix containing the folded integration of the :math:`M_{y \chi}` matrix and response function uncertainty matrix, with respect to normalized photon energy. Has shape (`len(channels)`, `len(segments)`, `len(knotIndex)`). Notes ----- See also -------- cspline.fancyTrapz2() Examples -------- """ shape = np.shape(yChis) segmentsLen = np.shape(segments)[0] knotsLen = shape[2] chLen = segmentsLen # initialize integArr for storing photon energy integrated values integArrVariance = xr.DataArray(np.zeros((chLen, segmentsLen, knotsLen)), dims=['channel', 'segment', 'knot_point'], coords={'channel':channels}) # loop over relevant DANTE channels for analysis for channelIdx in np.arange(chLen): # loop over photon energy segments (between knot points) for segmentNum, segment in enumerate(segments): energyMin, energyMax = segment # loop over knot points for knotNum in np.arange(knotsLen): # multiplication of response by spline matrix #use interpolation propagation by default, but runs slower #might not have much an effect. For now we'll let the user decide #need to optimize reponseInterpVariance if interpProp: responsesVariance = responseInterpVariance(energyNorms, energyMin, energyMax, responseUncertaintyFrame, channels) responsesUncertainty = np.sqrt(responsesVariance) else: responsesVariance = responseInterp(energyNorms, energyMin, energyMax, responseUncertaintyFrame, channels) multArr = np.abs(yChis[:, segmentNum, knotNum] * responsesUncertainty[:, channelIdx]) integVar = (energyMax - energyMin) ** 2 * trapzVariance(multArr, x=energyNorms) integArrVariance[channelIdx, segmentNum, knotNum] = integVar return integArrVariance
[docs]def detectorArrVariance(channels, knots, responseUncertaintyFrame, boundary="y0", npts=1000): r""" Propagates uncertanity through :func:`cspline.detectorArr() to find the variance in :math:`M_{int}`. Parameters ---------- channels : numpy.ndarray Array of DANTE channel numbers. knots : numpy.ndarray Array of photon energies describing positions of spline knots. responseUncertaintyFrame : pandas.core.frame.DataFrame DataFrame holding uncertainty percentages of DANTE channel responses as a function of photon energy (not normalized). npts : int, optional Number of points used in computing the integral. The default is 1000. Returns ------- detArrVariance : xarray.DataArray 2D array of channels and knot points uncertainties of shape `(n, n+1)`. detArrVarianceBoundaryCol : xarray.DataArray Column of variances in the cublic spline matrix corresponding to the knots at the boundary chosen with `boundary`. Notes ----- Covariances between segments is not currently accounted for. This covariance should be small compared to the other uncertainties, but should be noted. See also -------- cspline.detectorArr() Examples -------- """ # number of DANTE channels where we have useful measurements chLen = len(channels) # initialize normalized energies array # array of normalized energies over which we do the integral energyNorms = np.linspace(0, 1, num=npts) # producing segments from knotsUncertainty. Use the same segmentsArr #because there is no calculation done segments = segmentsArr(knots) # calculating array for converting from values of D_i to y_i. This # is an optimization as this array is constant! dToY = dToyArr(chLen) # M_{y \chi} coefficients array corresponding to given normalized # energies. Array shape is (energyNorms, segments, knotIndex). yChis = yChiCoeffArrEnergies(energyNorms, chLen, dToY) print("running fancytrapz2Variance") integFoldArrVariance = fancyTrapz2Variance(energyNorms, yChis, segments, responseUncertaintyFrame, channels) print("finished fancytrapz2") # sum along segment axis, as each segment must contribute to the # overall signal. #detArrVariance = np.sum(integFoldArrVariance, axis=1) detArrVariance = integFoldArrVariance.sum(dim="segment") detArrVariance.attrs['boundary'] = boundary if boundary == "y0": # extracting column corresponding to y0 detArrVarianceBoundaryCol = detArrVariance.isel(knot_point=0) detArrVariance = detArrVariance.isel(knot_point=slice(1, None)) elif boundary == "yn+1": # extracting column corresponding to y_{n+1} detArrVarianceBoundaryCol = detArrVariance.isel(knot_point=-1) detArrVariance = detArrVariance.isel(knot_point=slice(None, -1)) else: raise Exception(f"No method found for boundary {boundary}.") return detArrVariance, detArrVarianceBoundaryCol
[docs]def detectorUncertainty(channels, responseFile, responseUncertaintyFile=None, boundary="y0", npts=1000, samples=1000, MChistogram=False, saveDataset=True, csplineDatasetFile=''): r""" Finds the cspline detector matrix, it`s inverse matrix and std matrix using Monte Carlo uncertainty propagation. Propagates response uncertainties through Parameters ---------- channels: numpy.ndarray Array of DANTE channel numbers. responseFile: str Path to the `.csv` holding DANTE channel responses as a function of photon energy (not normalized). responseUncertaintyFile: str, optional Path to the `.csv` holding DANTE channel response uncertainties as a function of photon energy. Uncertainty values provided as percentages. boundary: str, optional Choose whether yGuess corresponds to :math:`y_0` (lowest photon energy) or :math:`y_{n+1}` (highest photon energy) boundary condition. This should correspond to the photon energy value given in knots. Options are `y0` or `yn+1`. Default 'y0'. npts: int, optional Number of points used in computing the integral. Default is 1000. samples: int, optional Number of samples to generate during Monte Carlo propagation. See :func:`error.detectorErrMC`. Default is 1000. Returns ------- detArr : xarray.DataArray Matrix representing the spectrally integrated folding of the detector response with a cubic spline interpolation of the x-ray spectrum. 2D array of channels and knot points of shape (n, n). detArrBoundaryCol : xarray.DataArray Column of cublic spline matrix corresponding to the knots at the boundary chosen with `boundary`. detArrVarianceBoundaryCol: xarray.DataArray Column of variances in the cublic spline matrix corresponding to the knots at the boundary chosen with `boundary`. detArrInv : xarray.DataArray Inversion of detArr, with the column corresponding to boundary removed so detArr is invertible. stdDetArrInv : xarray.DataArray Array of the standard deviation of each element in detArrInv based on variance using the `responseUncertaintyFrame` propagated with Monte Carlo. Notes ----- See also -------- Examples -------- """ #load response functions responseFrame = loadResponses(channels, responseFile) #create responseUncertainty with 0s if not given. if responseUncertaintyFile is None: responseUncertaintyFrame = responseFrame.copy() cols = list(responseFrame.columns.values) cols.remove('Energy(eV)') for col in cols: responseUncertaintyFrame.loc[:,col] = 0 else: responseUncertaintyFrame = loadResponseUncertainty(responseFrame, responseUncertaintyFile) knots = knotFind(channels, responseFrame) #knotsVariance = knotVarianceFind(channels, responseUncertaintyFrame) if not areDataFramesCompatible(channels, responseFrame, responseUncertaintyFrame): raise ValueError("Response frame and response uncertainty frame are not compatible. Check formats.") detArr, detArrBoundaryCol = detectorArr(channels, knots, responseFrame, boundary, npts) detArrVariance, detArrVarianceBoundaryCol = detectorArrVariance(channels, knots, responseUncertaintyFrame, boundary, npts) detArrInv, stdDetArrInv = detectorErrMC(detArr, detArrVariance, samples, boundary, MChistogram) #combine into Dataset csplineDataset = xr.Dataset(data_vars={'detArr' : detArr, 'detArrBoundaryCol' : detArrBoundaryCol, 'detArrVarianceBoundaryCol' : detArrVarianceBoundaryCol, 'detArrInv' : detArrInv, 'stdDetArrInv' : stdDetArrInv}, attrs={'boundary' : boundary, 'channels' : np.asarray(channels), 'yGuess' : 0, 'responseFile' : responseFile, 'responseUncertaintyFile' : responseUncertaintyFile, 'generatedDatetime' : str(datetime.datetime.now())}) if saveDataset: #save as netCDF as recommended by http://xarray.pydata.org/en/stable/io.html#netcdf if not csplineDatasetFile: csplineDatasetFile = 'csplineDataset_' + str(datetime.datetime.now().date()) + '.nc' csplineDataset.to_netcdf(csplineDatasetFile) return csplineDataset