Module taurunner.track.utils.spline_column_depth

Expand source code
import warnings
import numpy as np
from scipy.interpolate import RectBivariateSpline, splev, splrep
from scipy.integrate import quad
from importlib.resources import path as imppath
import os
import pickle as pkl

from taurunner.body import construct_earth
import taurunner as tr


with imppath('taurunner.resources.column_depth_splines', '__init__.py') as p:
    SPLINE_PATH = str(p).split('__init__.py')[0]


def column_depth(track,
                 body,
                 xi: float,
                 xf: float,
                 safe_mode=True
    ) -> float:
    '''
    params
    ______
    body      : TauRunner Body object for which you want the column depth
    xi        : Affine track parameter at which to start the integeration.
    xf        : Affine track parameter at which to end the integeration.
    safe_mode : If True make sure the error on the integral is small

    returns
    _______
    column_depth : column depth on the portion of the track from xi to xf [natural units]
    '''
    if not (xi<=xf):
        raise RuntimeError('xi must be less than or equal to xf')
    integrand = lambda x: body.get_density(track.x_to_r(x))*track.x_to_d_prime(x)*body.radius
    # find where the path intersects layer boundaries
    xx        = []
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        for r in body.layer_boundaries:
            xx = np.append(xx, track.r_to_x(r))
    # NaNs means it does not intersect the layer
    xx        = xx[np.where(~np.isnan(xx))[0]] # non-nanize
    # Remove xs before and after the integration limits
    mask      = np.where(np.logical_and(xi<xx, xx<xf))[0]
    xx        = np.hstack([[xi], xx[mask], [xf]])
    II        = []
    for xi, xf in zip(xx[:-1], xx[1:]):
        I = quad(integrand, xi, xf, full_output=1)
        if safe_mode:
            if I[0]!=0:
                if I[1]/I[0]>1e-3:
                    raise RuntimeError('Error too large')
        II.append(I[0])
    return np.sum(II)


# Precompute column depths when the object is initialized so that we are computing integrals
# every time
def column_depth_helper(track, body):
    xx            = np.linspace(0, 1, 301)
    column_depths = np.append(0, np.cumsum([column_depth(track, body, x[0], x[1]) for x in zip(xx[:-1], xx[1:])]))
    # Pad arrays to help spline stability
    npad          = 5
    xpad          = np.linspace(0.01, 0.05, npad)
    padded_xx     = np.hstack([-xpad[::-1], xx, 1+xpad])
    padded_cds    = np.hstack([np.full(npad, column_depths[0]), column_depths, np.full(npad, column_depths[-1])])
    # Make the splines
    x_to_X_tck    = splrep(padded_xx, padded_cds)
    x_to_X        = lambda x: splev(x, x_to_X_tck)
    
    return column_depths[-1], x_to_X

def get_hash(track, body):
    s = f'{body.radius}{track.depth}'
    for bd in body.layer_boundaries:
        s += str(bd)
    s = s.replace('.', '')
    hash_s = s
    #print(hash(str(s)))
    #hash_s = hash(s)
    #print(s, hash_s)
    return hash_s

def spline_fname(hash_s):
    x2X_fname = f'{SPLINE_PATH}/{hash_s}_x_to_X.pkl'
    spline_exists = os.path.isfile(x2X_fname)
    return x2X_fname, spline_exists

def construct_X2x(x2X):
    if x2X(0)==x2X(1): # there is no bijection since derivative of this function must be non-neg
        print(x2X(0))
        if x2X(0.5)!=0: # This should only happen in the case that there is no column depth
            raise ValueError('Constant X2x is only allow for tracks with no column depth')
        else:
            def X2x(x):
                raise Exception('You should not be calling this for null propagations')
    else:
        xx         = np.linspace(0, 1, 301)
        cds        = [x2X(x) for x in xx]
        cds[0]     = 0 # Sometimes spline support can be imperfect
        X2x_tck    = splrep(cds, xx)
        def X2x(X):
            return splev(X, X2x_tck)[0]
    return X2x

def set_spline(track, body, npad=5):
    hash_s                   = get_hash(track, body)
    x2X_fname, spline_exists = spline_fname(hash_s)
    if spline_exists:
        with open(x2X_fname, 'rb') as pkl_file:
            x2X_spline =  pkl.load(pkl_file)
    else:
        from taurunner.track import chord
        if track.depth==0:
            skip = True
        else:
            skip = False
        ths = np.arccos(np.linspace(-1, 1, 500))
        chs = [chord(theta=th, depth=track.depth) for th in ths]
        xx  = np.linspace(0, 1, 301)
        res = np.zeros((ths.shape[0], xx.shape[0]))
        for i, ch in enumerate(chs):
            if skip and np.cos(ch.theta)<=0:
                continue
            tot_X, x2X = column_depth_helper(ch, body)
            for j, x in enumerate(xx):
                res[i,j] = x2X(x)
        res[np.where(res<=0)] = np.min(res[np.where(res>0)])
        x2X_spline = RectBivariateSpline(np.cos(ths), xx, res, kx=2, ky=2)
        with open(x2X_fname, 'wb') as pkl_f:
            pkl.dump(x2X_spline, pkl_f)
    
    # reduce dimensionality of the spline to the desired theta    
    x2X = lambda x: x2X_spline(np.cos(track.theta), x)[0]
    tot_X = x2X(1)
    
    # Construct the X2x on the fly
    X2x = construct_X2x(x2X)
    
    #X2x = lambda X: X2x_spline(np.cos(track.theta), np.log(X))[0]
    track._column_depth_lookup[hash_s] = (tot_X, x2X, X2x)

Functions

def column_depth(track, body, xi: float, xf: float, safe_mode=True) ‑> float

params


body : TauRunner Body object for which you want the column depth xi : Affine track parameter at which to start the integeration. xf : Affine track parameter at which to end the integeration. safe_mode : If True make sure the error on the integral is small

returns


column_depth : column depth on the portion of the track from xi to xf [natural units]

Expand source code
def column_depth(track,
                 body,
                 xi: float,
                 xf: float,
                 safe_mode=True
    ) -> float:
    '''
    params
    ______
    body      : TauRunner Body object for which you want the column depth
    xi        : Affine track parameter at which to start the integeration.
    xf        : Affine track parameter at which to end the integeration.
    safe_mode : If True make sure the error on the integral is small

    returns
    _______
    column_depth : column depth on the portion of the track from xi to xf [natural units]
    '''
    if not (xi<=xf):
        raise RuntimeError('xi must be less than or equal to xf')
    integrand = lambda x: body.get_density(track.x_to_r(x))*track.x_to_d_prime(x)*body.radius
    # find where the path intersects layer boundaries
    xx        = []
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        for r in body.layer_boundaries:
            xx = np.append(xx, track.r_to_x(r))
    # NaNs means it does not intersect the layer
    xx        = xx[np.where(~np.isnan(xx))[0]] # non-nanize
    # Remove xs before and after the integration limits
    mask      = np.where(np.logical_and(xi<xx, xx<xf))[0]
    xx        = np.hstack([[xi], xx[mask], [xf]])
    II        = []
    for xi, xf in zip(xx[:-1], xx[1:]):
        I = quad(integrand, xi, xf, full_output=1)
        if safe_mode:
            if I[0]!=0:
                if I[1]/I[0]>1e-3:
                    raise RuntimeError('Error too large')
        II.append(I[0])
    return np.sum(II)
def column_depth_helper(track, body)
Expand source code
def column_depth_helper(track, body):
    xx            = np.linspace(0, 1, 301)
    column_depths = np.append(0, np.cumsum([column_depth(track, body, x[0], x[1]) for x in zip(xx[:-1], xx[1:])]))
    # Pad arrays to help spline stability
    npad          = 5
    xpad          = np.linspace(0.01, 0.05, npad)
    padded_xx     = np.hstack([-xpad[::-1], xx, 1+xpad])
    padded_cds    = np.hstack([np.full(npad, column_depths[0]), column_depths, np.full(npad, column_depths[-1])])
    # Make the splines
    x_to_X_tck    = splrep(padded_xx, padded_cds)
    x_to_X        = lambda x: splev(x, x_to_X_tck)
    
    return column_depths[-1], x_to_X
def construct_X2x(x2X)
Expand source code
def construct_X2x(x2X):
    if x2X(0)==x2X(1): # there is no bijection since derivative of this function must be non-neg
        print(x2X(0))
        if x2X(0.5)!=0: # This should only happen in the case that there is no column depth
            raise ValueError('Constant X2x is only allow for tracks with no column depth')
        else:
            def X2x(x):
                raise Exception('You should not be calling this for null propagations')
    else:
        xx         = np.linspace(0, 1, 301)
        cds        = [x2X(x) for x in xx]
        cds[0]     = 0 # Sometimes spline support can be imperfect
        X2x_tck    = splrep(cds, xx)
        def X2x(X):
            return splev(X, X2x_tck)[0]
    return X2x
def get_hash(track, body)
Expand source code
def get_hash(track, body):
    s = f'{body.radius}{track.depth}'
    for bd in body.layer_boundaries:
        s += str(bd)
    s = s.replace('.', '')
    hash_s = s
    #print(hash(str(s)))
    #hash_s = hash(s)
    #print(s, hash_s)
    return hash_s
def set_spline(track, body, npad=5)
Expand source code
def set_spline(track, body, npad=5):
    hash_s                   = get_hash(track, body)
    x2X_fname, spline_exists = spline_fname(hash_s)
    if spline_exists:
        with open(x2X_fname, 'rb') as pkl_file:
            x2X_spline =  pkl.load(pkl_file)
    else:
        from taurunner.track import chord
        if track.depth==0:
            skip = True
        else:
            skip = False
        ths = np.arccos(np.linspace(-1, 1, 500))
        chs = [chord(theta=th, depth=track.depth) for th in ths]
        xx  = np.linspace(0, 1, 301)
        res = np.zeros((ths.shape[0], xx.shape[0]))
        for i, ch in enumerate(chs):
            if skip and np.cos(ch.theta)<=0:
                continue
            tot_X, x2X = column_depth_helper(ch, body)
            for j, x in enumerate(xx):
                res[i,j] = x2X(x)
        res[np.where(res<=0)] = np.min(res[np.where(res>0)])
        x2X_spline = RectBivariateSpline(np.cos(ths), xx, res, kx=2, ky=2)
        with open(x2X_fname, 'wb') as pkl_f:
            pkl.dump(x2X_spline, pkl_f)
    
    # reduce dimensionality of the spline to the desired theta    
    x2X = lambda x: x2X_spline(np.cos(track.theta), x)[0]
    tot_X = x2X(1)
    
    # Construct the X2x on the fly
    X2x = construct_X2x(x2X)
    
    #X2x = lambda X: X2x_spline(np.cos(track.theta), np.log(X))[0]
    track._column_depth_lookup[hash_s] = (tot_X, x2X, X2x)
def spline_fname(hash_s)
Expand source code
def spline_fname(hash_s):
    x2X_fname = f'{SPLINE_PATH}/{hash_s}_x_to_X.pkl'
    spline_exists = os.path.isfile(x2X_fname)
    return x2X_fname, spline_exists