Source code for fiducia.cspline

"""
Created on Fri Mar  8 09:41:36 2019

Functions for working with cubic spline equation in matrix form.

@author: Pawel M. Kozlowski
"""

# python modules
import numpy as np
import scipy.sparse as sparse
from scipy import integrate
import matplotlib.pyplot as plt
import xarray as xr
# custom modules
import fiducia.pltDefaults
from fiducia.stats import dotVariance

# listing all functions declared in this file so that sphinx-automodapi
# correctly documents them and doesn't document imported functions.
__all__ = ["splineCoords",
           "splineCoordsInv",
           "yCoeffArr",
           "dCoeffArr",
           "dToyArr",
           "responseInterp",
           "yChiCoeffArr",
           "yChiCoeffArrEnergies",
           "fancyTrapz2",
           "segmentsArr",
           "detectorArr",
           "knotSolve",
           "reconstructSpectrum",
           ]


[docs]def splineCoords(energy, energyStart, energyEnd): r""" Convert photon energy value into normalized coordinates for a particular spline region. Parameters ---------- energy: float, numpy.ndarray Energy value to be converted into normalized spline coordinate. energyStart: float Lower bound energy for the spline region based on knot points. energyEnd: float Upper bound energy for the spline region based on knot points. normCoord: float Return value of energy converted into normalized spline coordinates. Returns ------- normCoord: float, numpy.ndarray Normalized energy coordinate(s). Notes ----- See also -------- Examples -------- """ # if energyStart > energyEnd: # raise ValueError(f"Lower bound energy {energyStart} must be less than " # f"upper bound energy {energyEnd}.") # if np.all(energy < energyStart) or np.all(energy > energyEnd): # raise ValueError(f"Given energy value {energy} is outside spline " # f"coordinate bounds {energyStart} to {energyEnd}.") normCoord = (energy - energyStart) / (energyEnd - energyStart) return normCoord
[docs]def splineCoordsInv(energyNorm, energyStart, energyEnd): r""" Given a normalized energy value and the bounds of a spline segment, return the un-normalized photon energy value. This is the inverse of splineCoords(). Parameters ---------- energyNorm: float, numpy.ndarray Normalized photon energy energyStart: float Lower bound energy for the spline region based on knot points. energyEnd: float Upper bound energy for the spline region based on knot points. Returns ------- energy: float, numpy.ndarray Absolute photon energy (un-normalized). Notes ----- See also -------- Examples -------- """ energy = (energyEnd - energyStart) * energyNorm + energyStart return energy
[docs]def yCoeffArr(energyNorm, chLen): r""" Returns the matrix M_y(t) for a given value of t in: .. math:: Y_i(t) = M_y(t) y_i + M_D(t) D_i Parameters ---------- energyNorm: float normalized photon energy for a spline section. chLen: int Number of DANTE channels (equal to number of spline knots). Returns ------- mArr: scipy.sparse.lil.lil_matrix Sparse matrix :math:`M_y(t)`. Notes ----- See also -------- Examples -------- """ coeff1 = 1 - 3 * energyNorm ** 2 + 2 * energyNorm **3 coeff2 = 3 * energyNorm ** 2 - 2 * energyNorm ** 3 mArr = sparse.diags([coeff1, coeff2], [0,1], shape=(chLen, chLen + 1), format='lil') #First and last row elements in y array are different (added by DHB 3/25/19) mArr[0,0] = 1 - energyNorm ** 3 mArr[0,1] = energyNorm ** 3 mArr[chLen-1,chLen-1] = 1 - energyNorm ** 3 mArr[chLen-1,chLen] = energyNorm ** 3 return mArr.tocsc()
[docs]def dCoeffArr(energyNorm, chLen): r""" Returns the matrix M_D(t) for a given value of t in: .. math:: Y_i(t) = M_y(t) y_i + M_D(t) D_i Parameters ---------- energyNorm: float normalized photon energy for a spline section. chLen: int Number of DANTE channels (equal to number of spline knots). Returns ------- dArr: scipy.sparse.lil.lil_matrix Sparse matrix :math:`M_D(t)`. Notes ----- See also -------- Examples -------- """ coeff1 = energyNorm - 2 * energyNorm ** 2 + energyNorm ** 3 coeff2 = -1 * energyNorm ** 2 + energyNorm ** 3 dArr = sparse.diags([coeff1, coeff2], [0,1], shape=(chLen, chLen + 1), format='lil') #First and last row elements in D array are different (added by DHB 3/25/19) dArr[0,0] = energyNorm - energyNorm ** 3 dArr[chLen-1,chLen-1] = energyNorm - energyNorm ** 2 dArr[chLen-1,chLen] = energyNorm ** 2 - energyNorm ** 3 return dArr.tocsc()
def __chi1Arr__(chLen): r""" Parameters ---------- chLen: int Number of DANTE channels (equal to number of spline knots). Returns ------- chi1: scipy.sparse.lil.lil_matrix Sparse matrix :math:`\chi_1`. Notes ----- See also -------- Examples -------- """ # chi1array in Dan Barnak's notation = TDy'' in Jim's paper chi1 = sparse.diags([1, 4, 1], [-1,0,1], shape=(chLen + 1, chLen + 1), format='lil') chi1[0, 0] = 2 chi1[-1, -1] = 2 return chi1.tocsc() def __chi3Arr__(chLen): r""" Parameters ---------- chLen: int Number of DANTE channels (equal to number of spline knots). Returns ------- chi3: scipy.sparse.lil.lil_matrix Sparse matrix :math:`\chi_3`. Notes ----- See also -------- Examples -------- """ # chi2array in Dan Barnak's notation = TDy in Jim's paper chi3 = sparse.diags([-1, 0, 1], [-1,0,1], shape=(chLen + 1, chLen + 1), format='lil') chi3[0, 0] = -1 chi3[-1, -1] = 1 return chi3.tocsc()
[docs]def dToyArr(chLen): r""" Construct matrix for converting from :math:`D_i` to :math:`y_i` vector. Parameters ---------- chLen: int Number of DANTE channels (equal to number of spline knots). Returns ------- diToyi: numpy.ndarray Matrix for converting from :math:`D_i` to :math:`y_i` vector. Notes ----- The matrix is given by: .. math:: D_i = 3 \chi_1^{-1} \chi_3 y_i See also -------- Examples -------- """ chi1Arr = __chi1Arr__(chLen) chi3Arr = __chi3Arr__(chLen) chi1ArrInv = np.linalg.inv(chi1Arr.toarray()) # constructing array for converting values from Di to yi diToyi = 3 * np.dot(chi1ArrInv, chi3Arr.toarray()) return diToyi
[docs]def responseInterp(energyNorm, energyMin, energyMax, responseFrame, channels): r""" Given a DANTE detector response as a function of energy, convert the response to normalized photon energy, t, over a given spline segment, and return interpolated response values for a given value of t. Returns an array of interpolated responses corresponding to the number of channels. Parameters ---------- energyNorm: float, numpy.ndarray normalized photon energy energyMin: float Lower bound photon energy of the spline segment over which we are normalizing. eneryMax: float Upper bound photon energy of the spline segment over which we are normalizing. responseFrame: pandas.core.frame.DataFrame DANTE channel responses as a function of photon energy (not normalized). channels: numpy.ndarray numpy array of DANTE channel numbers. Returns ------- responsesInterpd: numpy.ndarray Returns a matrix of (energyNorms, channels) of response functions. Notes ----- See also -------- Examples -------- """ chLen = len(channels) if np.shape(energyNorm): # if energyNorm is a vector, then get the length of that vector energyLen = len(energyNorm) else: # otherwise the length is just 1 energyLen = 1 # construct an array of interpolated response functions based on # number of photon energy points we want and number of DANTE channels responsesInterpd = np.zeros((energyLen, chLen)) # fetch the original array of energy values (not normalized) energyArr = responseFrame['Energy(eV)'] # converting normalized energy values to un-normalized energy values, # since our response functions are in terms of absolute photon energy energyReg = splineCoordsInv(energyNorm, energyMin, energyMax) for idx, channel in enumerate(channels): responseArr = responseFrame[channel] responsesInterpd[:,idx] = np.interp(x=energyReg, xp=energyArr, fp=responseArr) return responsesInterpd
[docs]def yChiCoeffArr(energyNorm, chLen, dToY): r""" This is the matrix corresponding to: .. math:: M_y(t) + 3 M_D(t) \chi_1^{-1} \chi_3 which describes the cubic spline interpolation of the x-ray spectrum. Parameters ---------- energyNorm: float normalized photon energy chLen: int Number of DANTE channels (equal to number of spline knots). dToY: numpy.ndarray Matrix for converting from :math:`D_i` to :math:`y_i` values in cubic spline interpolation. See dToyArr(). Returns ------- yChiArr: numpy.ndarray Returns a 2D matrix for a particular value of energyNorm. Notes ----- The matrix is given by: .. math:: M_y(t) + 3 M_D(t) \chi_1^{-1} \chi_3 See also -------- Examples -------- """ # M_y(t) yCoeff = yCoeffArr(energyNorm, chLen) # M_D(t) dCoeff = dCoeffArr(energyNorm, chLen) # folded x-ray spectrum and detector response array yChiArr = yCoeff.toarray() + np.dot(dCoeff.toarray(), dToY) return yChiArr
[docs]def yChiCoeffArrEnergies(energyNorms, chLen, dToY): r""" This is the matrix corresponding to: .. math:: M_y(t) + 3 M_D(t) \chi_1^{-1} \chi_3 which describes the cubic spline interpolation of the x-ray spectrum. energyNorms: numpy.ndarray Vector of normalized photon energies. chLen: int Number of DANTE channels (equal to number of spline knots). dToY: numpy.ndarray Matrix for converting from :math:`D_i` to :math:`y_i` values in cubic spline interpolation. See dToyArr(). Returns ------- yChiArrEnergies: numpy.ndarray Returns a 3D matrix composed of a series of 2D yChiCoeff matrices corresponding to the given energyNorm values. This matrix is indexed as (energyNorms, knotIndex, knotIndex). Notes ----- The matrix is given by: .. math:: M_y(t) + 3 M_D(t) \chi_1^{-1} \chi_3 See also -------- Examples -------- """ yChiArrEnergies = np.array([yChiCoeffArr(energyNorm, chLen, dToY) for energyNorm in energyNorms]) return yChiArrEnergies
[docs]def fancyTrapz2(energyNorms, yChis, segments, responseFrame, channels): r""" Trap rule integration of the folding between our :math:`M_{y \chi}` matrix and response function matrix, with respect to normalized photon energy, for each channel. The result should be a matrix with shape (`len(channels)`, `len(segments)`, `len(knotIndex)`). Parameters ---------- energyNorms: numpy.ndarray 1D array of normalized photon energies yChis: numpy.ndarray 3D array of :math:`M_{y \chi}` values corresponding to (energyNorms, segments, knotIndex). responses: numpy.ndarray 2D array of DANTE channel response functions corresponding to (energyNorms, channels). channels: numpy.ndarray Array of DANTE channel numbers. Returns ------- integArr : xarray.DataArray A matrix containing the folded integration of the :math:`M_{y \chi}` matrix and response function matrix, with respect to normalized photon energy. Has shape (`len(channels)`, `len(segments)`, `len(knotIndex)`). Notes ----- See also -------- Examples -------- """ shape = np.shape(yChis) segmentsLen = np.shape(segments)[0] knotsLen = shape[2] chLen = segmentsLen # initialize integArr for storing photon energy integrated values integArr = xr.DataArray(np.zeros((chLen, segmentsLen, knotsLen)), dims=['channel', 'segment', 'knot_point'], coords={'channel':channels}) # loop over relevant DANTE channels for analysis for channelIdx in np.arange(integArr['channel'].size): # loop over photon energy segments (between knot points) # print(f"Calculating for channel {channels[channelIdx]}.") for segmentNum, segment in enumerate(segments): energyMin, energyMax = segment # print(f"Calculating for segment {segments[segmentNum]}.") # loop over knot points for knotNum in np.arange(integArr['knot_point'].size): # multiplication of response by spline matrix responses = responseInterp(energyNorms, energyMin, energyMax, responseFrame, channels) # plt.plot(energyNorms, responses[:, channelIdx]) # plt.title(f"Response channel {channels[channelIdx]}, segment {segmentNum}, knot {knotNum}") # plt.show() multArr = yChis[:, segmentNum, knotNum] * responses[:, channelIdx] # integArr[channelIdx, segmentNum, knotNum] = np.trapz(y=multArr, # x=energyNorms) integVal = (energyMax - energyMin) * integrate.simps(y=multArr, x=energyNorms) integArr[channelIdx, segmentNum, knotNum] = integVal return integArr
[docs]def segmentsArr(knots): r""" Returns the bounds of each spline segment, given the spline knot points. Returns an array of tuples of (energyMin, energyMax) describing the bounds of each spline segment, given an array of spline knots (photon energies corresponding to K-edges). Parameters ---------- knots: numpy.ndarray numpy array of photon energies describing positions of spline knots. Returns ------- segments : numpy.ndarray A 1D array of tuples of (energyMin, energyMax), corresponding to the bounds of each spline segment. Has length of 'len(knots) - 1'. Notes ----- See also -------- Examples -------- """ # initializing array of segments segments = np.zeros(len(knots) - 1, dtype=object) for idx, knot in enumerate(knots): if idx == len(segments): # skipping the last knot since there are 1 fewer segments than knots continue segments[idx] = knots[idx], knots[idx + 1] return segments
[docs]def detectorArr(channels, knots, responseFrame, boundary="y0", npts=1000): r""" Matrix representing the spectrally integrated folding of the detector response with a cubic spline interpolation of the x-ray spectrum. This is applied to the measured DANTE channel signals to recover knot points :math:`y_i` of the cubic spline, which can then be used to reconstruct the inferred x-ray spectrum. Parameters ---------- channels: numpy.ndarray Array of DANTE channel numbers. knots: numpy.ndarray Array of photon energies describing positions of spline knots. responseFrame: pandas.core.frame.DataFrame DANTE channel responses as a function of photon energy (not normalized). boundary: str, optional Choose whether yGuess corresponds to :math:`y_0` (lowest photon energy) or :math:`y_{n+1}` (highest photon energy) boundary condition. This should correspond to the photon energy value given in knots. Options are 'y0' or 'yn+1'. Default 'y0'. npts: int Number of points used in computing the integral Returns ------- detArr: numpy.ndarray Matrix representing the spectrally integrated folding of the detector response with a cubic spline interpolation of the x-ray spectrum. 2D array of channels and knot points of shape (n, n). Notes ----- For each spline segment we have: .. math:: M_{stuff} = \int_0^{1} (M_y(t) + 3 M_D(t) \chi_1^{-1} \chi_3) R_d(t) dt Each spline is then summed to form the full detector matrix for recovering the knot points. See also -------- Examples -------- """ # number of DANTE channels where we have useful measurements chLen = len(channels) # initialize normalized energies array # array of normalized energies over which we do the integral energyNorms = np.linspace(0, 1, num=npts) # producing segments from knots segments = segmentsArr(knots) # calculating array for converting from values of D_i to y_i. This # is an optimization as this array is constant! dToY = dToyArr(chLen) # M_{y \chi} coefficients array corresponding to given normalized # energies. Array shape is (energyNorms, segments, knotIndex). yChis = yChiCoeffArrEnergies(energyNorms, chLen, dToY) integFoldArr = fancyTrapz2(energyNorms, yChis, segments, responseFrame, channels) # sum along segment axis, as each segment must contribute to the # overall signal. #detArr = np.sum(integFoldArr, axis=1) detArr = integFoldArr.sum(dim="segment") detArr.attrs['boundary'] = boundary if boundary == "y0": # extracting column corresponding to y0 detArrBoundaryCol = detArr.isel(knot_point=0) detArr = detArr.isel(knot_point=slice(1, None)) elif boundary == "yn+1": # extracting column corresponding to y_{n+1} detArrBoundaryCol = detArr.isel(knot_point=-1) detArr = detArr.isel(knot_point=slice(None, -1)) else: raise Exception(f"No method found for boundary {boundary}.") return detArr, detArrBoundaryCol
[docs]def knotSolve(signals, detArr, detArrBoundaryCol, detArrVarianceBoundaryCol, detArrInv, stdDetArrInv, signalsUncertainty=None, yGuess=1e-77, npts=1000): r""" Get knot points :math:`y_i` from measured DANTE signals :math:`S_d`. Parameters ---------- signals: numpy.ndarray numpy array of DANTE measured signal for each channel at a particular point in time. detArr : numpy.ndarray Matrix representing the spectrally integrated folding of the detector response with a cubic spline interpolation of the x-ray spectrum. 2D array of channels and knot points of shape (n, n). detArrBoundaryCol : xarray.DataArray Column of cublic spline matrix corresponding to the knots at the boundary chosen with `boundary`. detArrVarianceBoundaryCol: xarray.DataArray Column of variances in the cublic spline matrix corresponding to the knots at the boundary chosen with `boundary`. detArrInv : xarray.DataArray Inversion of detArr, with the column corresponding to boundary removed so detArr is invertible. stdDetArrInv: xarray.DataArray Array of the standard deviation of each element in detArrInv based on variance using the 'responseUncertaintyFrame' propagated with Monte Carlo. signalsUncertainty: xarray.DataArray, optional numpy array of the uncertainty of the DANTE measured signal for each channel at a particular point in time. The default is None. yGuess: float, optional Guess for position of boundary knot point. Default is 1e-77. npts: int, optional Number of points used in computing the integral. Default is 1000. Returns ------- knotsY : numpy.ndarray Array of knot point intensity values with yGuess appended. knotsYVariance: numpy.ndarray Array with each element corresponding to the variance of the same element in 'knotsY'. Notes ----- See also -------- Examples -------- """ #subtract boundary col from signals signalsy0 = signals - yGuess * detArrBoundaryCol signalsy0Variance = signalsUncertainty**2 + yGuess**2 * detArrVarianceBoundaryCol signalsy0Uncertainty = np.sqrt(signalsy0Variance) # applying inverted array to signals to recover knot points #huge difference between xarray.dot and np.dot. See issue in Gitlab knotsY = np.dot(detArrInv, signalsy0) #knotsYother = detArrInv.dot(signalsy0, {subscripts:'i,i'}) #print("knot diff", knotsYother-knotsY) knotsYVariance = dotVariance(detArrInv, signalsy0, stdDetArrInv, signalsy0Uncertainty) return knotsY, knotsYVariance
[docs]def reconstructSpectrum(chLen, knots, knotsY, knotsYUncertainty=None, npts=1000, plot=False): r""" Reconstruct the inferred DANTE spectrum given the knot points :math:`y_i` obtained from knotSolve(). Parameters ---------- chLen: int Number of DANTE channels (equal to number of spline knots). knots: list, numpy.ndarray List or array of knot point photon energy value. See knotFind(). knotsY: numpy.ndarray Array of knot point intensity values with yGuess appended. See knotSolve() and analyzeSpectrum(). knotsYUncertainty: numpy.ndarray Array of knot point intensity uncertainty values with yGuess appended. See knotSolve() and analyzeSpectrum(). The default is None. npts: int Number of points used in computing the integral. The default is 1000. plot: Bool Flag for plotting unfolded spectrum. The default is False. Returns ------- photonEnergies: numpy.ndarray Photon energy axis of unfolded spectrum. intensities: numpy.ndarray Spectral intensity axis of unfolded spectrum. intensitiesVariance: numpy.ndarray Uncertaitny (1 :math:`\sigma`) on spectral intensity values. Notes ----- See also -------- Examples -------- """ #check for no uncertainty provided. if knotsYUncertainty is None: #give them uncertainty of 0 knotsYUncertainty = np.zeros(knotsY.shape) # initialize normalized energies array energyNorms = np.linspace(0, 1, num=npts) # producing segments from knots segments = segmentsArr(knots) # getting cubic spline matrices for reconstructing spline dToY = dToyArr(chLen) yChis = yChiCoeffArrEnergies(energyNorms, chLen, dToY) #set a constant simple uncertainty for the yChis. #TOOD replace with something physical yChisUncertainty = np.zeros(yChis.shape) # recovering the spectral values spectrum = np.dot(yChis, knotsY) spectrumVariance = dotVariance(yChis, knotsY, yChisUncertainty, knotsYUncertainty) # print(spectrumVariance) # initialize arrays for holding photon energies and corresponding # intensities of stitched spectrum. photonEnergies = np.array([]) intensities = np.array([]) intensitiesVariance = np.array([]) # loop over segments to reconstruct spectrum for segNum, segment in enumerate(segments): energyMin, energyMax = segment energyRegs = splineCoordsInv(energyNorms, energyMin, energyMax) # appending each segment to form a single array of spectral # values. We need to cut the last element from each array as the # last element overlaps the first element of the next segment. photonEnergies = np.append(photonEnergies, energyRegs[:-1]) intensities = np.append(intensities, spectrum[:-1, segNum]) intensitiesVariance = np.append(intensitiesVariance, spectrumVariance[:-1, segNum]) if plot: plt.plot(energyRegs, spectrum[:, segNum]) if plot: plt.ylabel('Spectrum (GW/sr/eV)') plt.xlabel('Photon Energy (eV)') plt.title('Spectrum from cubic spline knots') plt.show() return photonEnergies, intensities, intensitiesVariance