Source code for delensalot.sims.sims_lib

"""sims/sims_lib.py: library for collecting and handling simulations. Eventually, delensalot needs a `get_sim_pmap()` and `get_sim_tmap()`, which are the maps of the observed sky. But data may come as cls, unlensed alms, ..., . So this module allows to start analysis with,
    * cls,
    * alms_unl
    * alm_len + noise
    * obs_sky
    
and is a mapper between them, so that `get_sim_pmap()` and `get_sim_tmap()` always returns observed maps.
"""

import os
from os.path import join as opj
import numpy as np, healpy as hp

import logging
log = logging.getLogger(__name__)

import lenspyx
from lenspyx.lensing import get_geom as lp_get_geom
from plancklens.sims import phas
from delensalot.core import cachers
from delensalot.config.metamodel import DEFAULT_NotAValue as DNaV

import delensalot
from delensalot.utils import load_file, cli


def klm2plm(klm, lmax):
    k2p = 0.5 * np.arange(lmax + 1) * np.arange(1, lmax + 2, dtype=float)
    return hp.almxfl(klm, cli(k2p))

def dlm2plm(dlm, lmax):
    assert 0, 'check factor'
    LL = np.arange(0,lmax+1,1)
    factor = np.sqrt(LL*(LL+1))
    return hp.almxfl(dlm, cli(factor))

def clk2clp(clk, lmax):
    assert 0, 'check factor'
    LL = np.arange(0,lmax+1,1)
    factor = (LL*(LL+1)/2)**2
    return hp.almxfl(clk, cli(factor))

def cld2clp(cld, lmax):
    assert 0, 'check factor'
    LL = np.arange(0,lmax+1,1)
    factor = LL*(LL+1)
    return hp.almxfl(cld, cli(factor))


def get_dirname(s):
    return s.replace('(', '').replace(')', '').replace('{', '').replace('}', '').replace(' ', '').replace('\'', '').replace('\"', '').replace(':', '_').replace(',', '_').replace('[', '').replace(']', '')

def dict2roundeddict(d):
    for k,v in d.items():
        d[k] = np.around(v,3)
    return d

class iso_white_noise:
    """class for generating very simple isotropic white noise
    """
    def __init__(self, nlev, lmax=DNaV, libdir=DNaV, fns=DNaV, spin=DNaV, space=DNaV, geominfo=DNaV, libdir_suffix=DNaV):
        self.geominfo = geominfo
        if geominfo == DNaV:
            self.geominfo = ('healpix', {'nside':2048})
        self.geom_lib = get_geom(geominfo)
        self.libdir = libdir
        self.spin = spin
        self.lmax = lmax
        self.space = space
        if libdir == DNaV:
            self.nlev = nlev
            assert libdir_suffix != DNaV, 'must give libdir_suffix'
            nlev_round = dict2roundeddict(self.nlev)
            self.libdir_phas = os.environ['SCRATCH']+'/simulation/{}/{}/phas/{}/'.format(libdir_suffix, get_dirname(str(geominfo)), get_dirname(str(sorted(nlev_round.items()))))
            self.pix_lib_phas = phas.pix_lib_phas(self.libdir_phas, 3, (self.geom_lib.npix(),))
        else:
            if fns == DNaV:
                assert 0, "must provide fns"
            self.fns = fns

        self.cacher = cachers.cacher_mem(safe=True)


    def get_sim_noise(self, simidx, space, field, spin=2):
        """_summary_

        Args:
            simidx (_type_): _description_
            space (_type_): _description_
            field (_type_): _description_
            spin (int, optional): _description_. Defaults to 2.

        Returns:
            _type_: _description_
        """        
        if space == 'alm' and spin == 2:
            assert 0, "I don't think you want qlms ulms."
        if field == 'temperature' and spin == 2:
            assert 0, "I don't think you want spin-2 temperature."
        if field == 'temperature' and 'T' not in self.nlev:
            assert 0, "need to provide T key in nlev"
        if field == 'polarization' and 'P' not in self.nlev:
            assert 0, "need to provide P key in nlev"
        fn = 'noise_space{}_spin{}_field{}_{}'.format(space, spin, field, simidx)
        if not self.cacher.is_cached(fn):
            if self.libdir == DNaV:
                if self.geominfo[0] == 'healpix':
                    vamin = np.sqrt(hp.nside2pixarea(self.geominfo[1]['nside'], degrees=True)) * 60
                else:
                    ## FIXME this is a rough estimate, based on total sky coverage / npix()
                    vamin =  np.sqrt(4*np.pi) * (180/np.pi) / self.geom_lib.npix() * 60
                if field == 'polarization':
                    noise1 = self.nlev['P'] / vamin * self.pix_lib_phas.get_sim(int(simidx), idf=1)
                    noise2 = self.nlev['P'] / vamin * self.pix_lib_phas.get_sim(int(simidx), idf=2) # TODO this always produces qu-noise in healpix geominfo?
                    noise = np.array([noise1, noise2])
                    if space == 'map':
                        if spin == 0:
                            alm_buffer = self.geom_lib.map2alm_spin(noise, spin=2, lmax=self.lmax, mmax=self.lmax, nthreads=4)
                            noise1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                            noise2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                            noise = np.array([noise1, noise2])
                    elif space == 'alm':
                        noise = self.geom_lib.map2alm_spin(noise, spin=2, lmax=self.lmax, mmax=self.lmax, nthreads=4)
                elif field == 'temperature':
                    noise = self.nlev['T'] / vamin * self.pix_lib_phas.get_sim(int(simidx), idf=0)
                    if space == 'alm':
                        noise = self.geom_lib.map2alm(noise, lmax=self.lmax, mmax=self.lmax, nthreads=4)
            else:
                if field == 'polarization':
                    if self.spin == 2:
                        noise1 = load_file(opj(self.libdir, self.fns['Q'].format(simidx)))
                        noise2 = load_file(opj(self.libdir, self.fns['U'].format(simidx)))
                    elif self.spin == 0:
                        noise1 = load_file(opj(self.libdir, self.fns['E'].format(simidx)))
                        noise2 = load_file(opj(self.libdir, self.fns['B'].format(simidx)))
                    noise = np.array([noise1, noise2])
                    if self.space == 'map':
                        if space == 'alm':
                            if self.spin == 0:
                                noise1 = self.geom_lib.map2alm(noise[0], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                noise2 = self.geom_lib.map2alm(noise[1], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                noise = np.array([noise1, noise2])
                            elif self.spin == 2:
                                noise = self.geom_lib.map2alm_spin(noise, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4)
                        elif space == 'map':
                            if self.spin != spin:
                                if self.spin == 0:
                                    alm_buffer1 = self.geom_lib.map2alm(noise[0], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                    alm_buffer2 = self.geom_lib.map2alm(noise[1], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                    noise = self.geom_lib.alm2map_spin([alm_buffer1,alm_buffer2], lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4)
                                elif self.spin == 2:
                                    alm_buffer = self.geom_lib.map2alm_spin(noise, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                    noise1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                    noise2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                    noise = np.array([noise1, noise2])
                    elif self.space == 'alm':
                        if space == 'map':
                            if spin == 0:
                                noise1 = self.geom_lib.map2alm(noise[0], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                noise2 = self.geom_lib.map2alm(noise[1], lmax=self.lmax, mmax=self.lmax, nthreads=4)
                                noise = np.array([noise1, noise2])
                            elif spin == 2:
                                noise = self.geom_lib.alm2map_spin(noise, spin=spin, lmax=self.lmax, mmax=self.lmax, nthreads=4)       
                elif field == 'temperature':
                    noise = np.array(load_file(opj(self.libdir, self.fns['T'].format(simidx))))
                    if self.space == 'map':
                        if space == 'alm':
                            noise = self.geom_lib.map2alm(noise, lmax=self.lmax, mmax=self.lmax, nthreads=4)
                    elif self.space == 'alm':
                        if space == 'map':
                            noise = self.geom_lib.alm2map(noise, lmax=self.lmax, mmax=self.lmax, nthreads=4)
            self.cacher.cache(fn, noise)  
        return self.cacher.load(fn)


[docs]class Cls: """class for accessing CAMB-like file for CMB power spectra, optionally a distinct file for the lensing potential """ def __init__(self, lmax=DNaV, phi_lmax=DNaV, CMB_fn=DNaV, phi_fn=DNaV, phi_field='potential'): assert lmax != DNaV, "need to provide lmax" self.lmax = lmax self.phi_lmax = phi_lmax if phi_lmax == DNaV: self.phi_lmax = lmax + 1024 if CMB_fn == DNaV: self.CMB_fn = opj(os.path.dirname(delensalot.__file__), 'data', 'cls', 'FFP10_wdipole_lenspotentialCls.dat') self.CAMB_file = load_file(self.CMB_fn) else: self.CMB_fn = CMB_fn self.CAMB_file = load_file(CMB_fn) if phi_fn == 'None': self.phi_fn = None elif phi_fn == DNaV: self.phi_fn = self.CMB_fn buff = load_file(self.phi_fn) if self.phi_fn.endswith('npy'): if len(buff) > 1: self.phi_file = load_file(self.phi_fn)[:,1] else: self.phi_file = load_file(self.phi_fn) else: self.phi_file = load_file(self.phi_fn)['pp'] self.phi_field = phi_field # assuming that CAMB file is 'potential' else: self.phi_fn = phi_fn buff = load_file(self.phi_fn) if self.phi_fn.endswith('npy'): if len(buff) > 1: self.phi_file = load_file(self.phi_fn)[:,1] else: self.phi_file = load_file(self.phi_fn) else: self.phi_file = load_file(self.phi_fn)['pp'] self.phi_field = phi_field log.info("phi_fn is {}".format(self.phi_fn)) self.cacher = cachers.cacher_mem(safe=True) def get_TEBunl(self, simidx): fn = 'cls_{}'.format(simidx) if not self.cacher.is_cached(fn): ClT, ClE, ClB, ClTE = self.CAMB_file['tt'][:self.lmax+1], self.CAMB_file['ee'][:self.lmax+1], self.CAMB_file['bb'][:self.lmax+1], self.CAMB_file['te'][:self.lmax+1] self.cacher.cache(fn, np.array([ClT, ClE, ClB, ClTE])) return self.cacher.load(fn) def get_sim_clphi(self, simidx): fn = 'clphi_{}'.format(simidx) if not self.cacher.is_cached(fn): ClP = self.phi_file[:self.phi_lmax+1] self.cacher.cache(fn, np.array(ClP)) return self.cacher.load(fn)
[docs]class Xunl: """class for generating unlensed CMB and phi realizations from power spectra """ def __init__(self, lmax, cls_lib=DNaV, libdir=DNaV, fns=DNaV, fnsP=DNaV, libdir_phi=DNaV, phi_field='potential', phi_space=DNaV, phi_lmax=DNaV, space=DNaV, geominfo=DNaV, isfrozen=False, spin=DNaV): self.geominfo = geominfo if geominfo == DNaV: self.geominfo = ('healpix', {'nside':2048}) self.geom_lib = get_geom(self.geominfo) self.libdir = libdir self.space = space self.spin = spin self.lmax = lmax self.libdir_phi = libdir_phi self.phi_lmax = phi_lmax if phi_field == DNaV: self.phi_field = 'potential' else: self.phi_field = phi_field self.phi_space = phi_space if libdir_phi == DNaV: # need being generated if cls_lib == DNaV: self.cls_lib = Cls(lmax=lmax, phi_field=self.phi_field, phi_lmax=self.phi_lmax) else: self.cls_lib = cls_lib self.phi_lmax = self.cls_lib.phi_lmax if libdir != DNaV: if self.space == DNaV: assert 0, 'need to give space (map or alm)' self.fns = fns if self.fns == DNaV: assert 0, 'need to give fns' if self.spin == DNaV: assert 0, 'need to give spin' else: if cls_lib == DNaV: self.cls_lib = Cls(lmax=lmax, phi_field=self.phi_field, phi_lmax=self.phi_lmax) else: self.cls_lib = cls_lib self.phi_lmax = self.cls_lib.phi_lmax if libdir_phi != DNaV: self.fnsP = fnsP if self.fnsP == DNaV: assert 0, 'need to give fnsP' if self.phi_space == DNaV: assert 0, 'need to give phi_space (map or alm)' else: #TODO make sure I didn't screw up phi_lmax if 'nside' in self.geominfo[1]: geom_lmax = 3*self.geominfo[1]['nside'] elif 'lmax' in self.geominfo[1]: geom_lmax = self.geominfo[1]['lmax'] else: geom_lmax = lmax + 1024 self.phi_lmax = np.min([lmax + 1024, geom_lmax]) self.isfrozen = isfrozen self.cacher = cachers.cacher_mem(safe=True)
[docs] def get_sim_unl(self, simidx, space, field, spin=2): """returns an unlensed simulation field (temp,pol) in space (map, alm) and as spin (0,2). Note, spin is only applicable for pol, and returns QU for spin=2, and EB for spin=0. Args: simidx (_type_): _description_ space (_type_): _description_ field (_type_): _description_ spin (int, optional): _description_. Defaults to 2. Returns: _type_: _description_ """ if space == 'alm' and spin == 2: assert 0, "I don't think you want qlms ulms." if field == 'temperature' and spin == 2: assert 0, "I don't think you want spin-2 temperature." fn = 'unl_space{}_spin{}_field{}_{}'.format(space, spin, field, simidx) if not self.cacher.is_cached(fn): if self.libdir == DNaV: Cls = self.cls_lib.get_TEBunl(simidx) unl = np.array(self.cl2alm(Cls, field=field, seed=simidx)) if space == 'map': if field == 'polarization': if spin == 2: unl = self.geom_lib.alm2map_spin(unl, lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif spin == 0: unl1 = self.geom_lib.alm2map(unl[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl2 = self.geom_lib.alm2map(unl[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl = np.array([unl1, unl2]) elif field == 'temperature': unl = self.geom_lib.alm2map(unl, lmax=self.lmax, mmax=self.lmax, nthreads=4) else: if field == 'polarization': if self.spin == 2: unl1 = load_file(opj(self.libdir, self.fns['Q'].format(simidx))) unl2 = load_file(opj(self.libdir, self.fns['U'].format(simidx))) elif self.spin == 0: unl1 = load_file(opj(self.libdir, self.fns['E'].format(simidx))) unl2 = load_file(opj(self.libdir, self.fns['B'].format(simidx))) unl = np.array([unl1, unl2]) if self.space == 'map': if space == 'alm': if self.spin == 2: unl = self.geom_lib.map2alm_spin(unl, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.spin == 0: unl1 = self.geom_lib.map2alm(unl[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl2 = self.geom_lib.map2alm(unl[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl = np.array([unl1, unl2]) elif space == 'map': if self.spin != spin: if self.spin == 0: alm_buffer1 = self.geom_lib.map2alm(unl[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) alm_buffer2 = self.geom_lib.map2alm(unl[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl = self.geom_lib.alm2map_spin([alm_buffer1,alm_buffer2], lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif self.spin == 2: alm_buffer = self.geom_lib.map2alm_spin(unl, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) unl1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) unl = np.array([unl1, unl2]) elif self.space == 'alm': if space == 'map': if spin == 0: unl = self.geom_lib.alm2map(unl, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif spin == 2: unl = self.geom_lib.alm2map_spin(unl, spin=spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif field == 'temperature': unl = np.array(load_file(opj(self.libdir, self.fns['T'].format(simidx)))) if self.space == 'map': if space == 'alm': unl = self.geom_lib.map2alm(unl, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.space == 'alm': if space == 'map': unl = self.geom_lib.alm2map(unl, lmax=self.lmax, mmax=self.lmax, nthreads=4) self.cacher.cache(fn, unl) return self.cacher.load(fn)
[docs] def get_sim_phi(self, simidx, space): """ Args: simidx (_type_): _description_ space (_type_): _description_ spin (int, optional): _description_. Defaults to 2. Returns: _type_: _description_ """ fn = 'phi_space{}_{}'.format(space, simidx) if not self.cacher.is_cached(fn): if self.libdir_phi == DNaV: log.info('generating phi from cl') Clpf = self.cls_lib.get_sim_clphi(simidx) self.phi_field = self.cls_lib.phi_field Clp = self.clpf2clppot(Clpf) phi = self.clp2plm(Clp, simidx) if space == 'map': phi = self.geom_lib.alm2map(phi, lmax=self.phi_lmax, mmax=self.phi_lmax, nthreads=4) else: # print('phi field at {} is {}'.format(opj(self.libdir_phi, self.fnsP.format(simidx)), self.phi_field)) if self.phi_space == 'map': phi = np.array(load_file(opj(self.libdir_phi, self.fnsP.format(simidx))), dtype=float) else: phi = np.array(load_file(opj(self.libdir_phi, self.fnsP.format(simidx))), dtype=complex) if self.phi_space == 'map': self.geominfo_phi = ('healpix', {'nside':hp.npix2nside(phi.shape[0])}) self.geomlib_phi = get_geom(self.geominfo_phi) phi = self.geomlib_phi.map2alm(phi, lmax=self.phi_lmax, mmax=self.phi_lmax, nthreads=4) phi = self.pflm2plm(phi) if space == 'map': phi = self.geom_lib.alm2map(phi, lmax=self.phi_lmax, mmax=self.phi_lmax, nthreads=4) self.cacher.cache(fn, phi) return self.cacher.load(fn)
def pflm2plm(self, philm): if self.phi_field == 'convergence': return klm2plm(philm, self.phi_lmax) elif self.phi_field == 'deflection': return dlm2plm(philm, self.phi_lmax) elif self.phi_field == 'potential': return philm def clpf2clppot(self, cl): if self.phi_field == 'convergence': return clk2clp(cl, self.phi_lmax) elif self.phi_field == 'deflection': return cld2clp(cl, self.phi_lmax) elif self.phi_field == 'potential': return cl def cl2alm(self, cls, field, seed): np.random.seed(int(seed)) # check if this starting point is random if field == 'polarization': alms = hp.synalm(cls, self.lmax, new=True) return alms[1:] elif field == 'temperature': alm = hp.synalm(cls, self.lmax) return alm[0] def clp2plm(self, clp, seed): np.random.seed(int(seed)) plm = hp.synalm(clp, self.phi_lmax) return plm
[docs]class Xsky: """class for generating lensed CMB and phi realizations from unlensed realizations, using lenspyx for the lensing operation """ def __init__(self, lmax, unl_lib=DNaV, libdir=DNaV, fns=DNaV, spin=DNaV, epsilon=1e-7, space=DNaV, geominfo=DNaV, isfrozen=False, lenjob_geominfo=DNaV): self.geominfo = geominfo if geominfo == DNaV: self.geominfo = ('healpix', {'nside':2048}) self.geom_lib = get_geom(self.geominfo) self.libdir = libdir self.fns = fns self.spin = spin self.lmax = lmax self.space = space if libdir == DNaV: # need being generated if unl_lib == DNaV: self.unl_lib = Xunl(lmax=lmax, geominfo=self.geominfo) else: self.unl_lib = unl_lib if epsilon == DNaV: self.epsilon = 1e-7 else: self.epsilon = epsilon else: if self.spin == DNaV: assert 0, 'need to give spin' if self.space == DNaV: assert 0, 'need to give space (map or alm)' if fns == DNaV: assert 0, 'you need to provide fns' self.isfrozen = isfrozen if lenjob_geominfo == DNaV: self.lenjob_geominfo = ('thingauss', {'lmax':lmax+1024, 'smax':3}) else: self.lenjob_geominfo = lenjob_geominfo self.lenjob_geomlib = lp_get_geom(self.lenjob_geominfo) self.cacher = cachers.cacher_mem(safe=True)
[docs] def get_sim_sky(self, simidx, space, field, spin=2): """returns a lensed simulation field (temperature, polarization) in space (map, alm) and as spin (0,2). Note, spin is only applicable for pol, and returns QU for spin=2, and EB for spin=0. Args: simidx (_type_): _description_ space (_type_): _description_ field (_type_): _description_ spin (int, optional): _description_. Defaults to 2. Returns: _type_: _description_ """ if space == 'alm' and spin == 2: assert 0, "I don't think you want qlms ulms." if field == 'temperature' and spin == 2: assert 0, "I don't think you want spin-2 temperature." fn = 'sky_space{}_spin{}_field{}_{}'.format(space, spin, field, simidx) log.info('requesting "{}"'.format(fn)) if not self.cacher.is_cached(fn): fn_other = 'sky_space{}_spin{}_field{}_{}'.format(space, self.spin, field, simidx) if not self.cacher.is_cached(fn_other): log.info('..nothing cached..') if self.libdir == DNaV: log.info('.., generating.') unl = self.unl_lib.get_sim_unl(simidx, space='alm', field=field, spin=0) philm = self.unl_lib.get_sim_phi(simidx, space='alm') if field == 'polarization': sky = self.unl2len(unl, philm, spin=2, epsilon=self.epsilon) if space == 'map': if spin == 0: alm_buffer = self.lenjob_geomlib.map2alm_spin(sky, spin=2, lmax=self.lmax, mmax=self.lmax, nthreads=4) sky1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) elif spin == 2: sky = self.lenjob_geomlib.map2alm_spin(np.copy(sky), spin=2, lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = self.geom_lib.alm2map_spin(np.copy(sky), lmax=self.lmax, spin=2, mmax=self.lmax, nthreads=4) elif space == 'alm': sky = self.lenjob_geomlib.map2alm_spin(sky, lmax=self.lmax, spin=2, mmax=self.lmax, nthreads=4) elif field == 'temperature': sky = self.unl2len(unl, philm, spin=0, epsilon=self.epsilon) if space == 'map': sky = self.lenjob_geomlib.map2alm(np.copy(sky), lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = self.geom_lib.alm2map(np.copy(sky), lmax=self.lmax, mmax=self.lmax, nthreads=4) elif space == 'alm': sky = self.lenjob_geomlib.map2alm(sky, lmax=self.lmax, mmax=self.lmax, nthreads=4) else: log.info('.., but stored on disk.') if field == 'polarization': if self.spin == 2: sky1 = load_file(opj(self.libdir, self.fns['Q'].format(simidx))) sky2 = load_file(opj(self.libdir, self.fns['U'].format(simidx))) elif self.spin == 0: sky1 = load_file(opj(self.libdir, self.fns['E'].format(simidx))) sky2 = load_file(opj(self.libdir, self.fns['B'].format(simidx))) sky = np.array([sky1, sky2]) if self.space == 'map': if space == 'alm': if self.spin == 0: sky1 = self.geom_lib.map2alm(sky[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.map2alm(sky[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) else: sky = self.geom_lib.map2alm_spin(sky, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif space == 'map': if self.spin != spin: if self.spin == 0: alm_buffer1 = self.geom_lib.map2alm(sky[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) alm_buffer2 = self.geom_lib.map2alm(sky[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = self.geom_lib.alm2map_spin([alm_buffer1,alm_buffer2], lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif self.spin == 2: alm_buffer = self.geom_lib.map2alm_spin(sky, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) sky1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) elif self.space == 'alm': if space == 'map': if spin == 0: sky1 = self.geom_lib.alm2map(sky[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.alm2map(sky[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) else: sky = self.geom_lib.alm2map_spin(sky, spin=spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif field == 'temperature': sky = np.array(load_file(opj(self.libdir, self.fns['T'].format(simidx)))) if self.space == 'map': if space == 'alm': sky = self.geom_lib.map2alm(sky, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.space == 'alm': if space == 'map': sky = self.geom_lib.alm2map(sky, lmax=self.lmax, mmax=self.lmax, nthreads=4) else: log.info('found "{}"'.format(fn_other)) sky = self.cacher.load(fn_other) if space == 'map': sky = self.geom_lib.alm2map_spin(self.lenjob_geomlib.map2alm_spin(sky, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4), lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) self.cacher.cache(fn, np.array(sky)) return self.cacher.load(fn)
def unl2len(self, Xlm, philm, **kwargs): ll = np.arange(0,self.unl_lib.phi_lmax+1,1) return lenspyx.alm2lenmap_spin(Xlm, hp.almxfl(philm, np.sqrt(ll*(ll+1))), geometry=self.lenjob_geominfo, **kwargs)
[docs]class Xobs: """class for generating observed CMB realizations from sky maps together with a noise realization and transfer function to mimick an experiment """ def __init__(self, lmax, maps=DNaV, transfunction=DNaV, len_lib=DNaV, unl_lib=DNaV, epsilon=DNaV, noise_lib=DNaV, libdir=DNaV, fns=DNaV, nlev=DNaV, libdir_noise=DNaV, fnsnoise=DNaV, spin=DNaV, space=DNaV, geominfo=DNaV, field=DNaV, cacher=DNaV, libdir_suffix=DNaV): self.geominfo = geominfo if geominfo == DNaV: self.geominfo = ('healpix', {'nside':2048}) self.geom_lib = get_geom(self.geominfo) self.libdir = libdir self.fns = fns self.spin = spin self.lmax = lmax self.space = space self.noise_lib = noise_lib self.fullsky = True #FIXME make it dependent on userdata: if Xobs is set via simhandler, then check if user data is full sky or not. self.cacher = cachers.cacher_mem(safe=True) self.maps = maps if np.all(self.maps != DNaV): fn = 'obs_space{}_spin{}_field{}_{}'.format(space, spin, field, 0) self.cacher.cache(fn, np.array(self.maps)) else: if libdir == DNaV: if len_lib == DNaV: self.len_lib = Xsky(unl_lib=unl_lib, lmax=lmax, libdir=libdir, fns=fns, space=space, epsilon=epsilon, geominfo=geominfo) else: self.len_lib = len_lib if noise_lib == DNaV: if libdir_noise == DNaV: if nlev == DNaV: assert 0, "need nlev for generating noise" self.noise_lib = iso_white_noise(nlev=nlev, lmax=lmax, fns=fnsnoise,libdir=libdir_noise, space=space, geominfo=self.geominfo, libdir_suffix=libdir_suffix) if np.all(transfunction == DNaV): assert 0, 'need to give transfunction' self.transfunction = transfunction elif libdir != DNaV: if self.space == DNaV: assert 0, 'need to give space (map or alm)' self.fns = fns if fns == DNaV: assert 0, 'you need to provide fns' if self.spin == DNaV: assert 0, 'need to give spin'
[docs] def get_sim_obs(self, simidx, space, field, spin=2): """_summary_ Args: simidx (_type_): _description_ space (_type_): _description_ field (_type_): _description_ spin (int, optional): _description_. Defaults to 2. Returns: _type_: _description_ """ if space == 'alm' and spin == 2: assert 0, "I don't think you want qlms ulms." if field == 'temperature' and spin == 2: assert 0, "I don't think you want spin-2 temperature." if not self.fullsky: assert self.spin == spin, "can only provide existing data" assert self.space == space, "can only provide existing data" fn = 'obs_space{}_spin{}_field{}_{}'.format(space, spin, field, simidx) log.info('requesting "{}"'.format(fn)) fn_otherspin = 'obs_space{}_spin{}_field{}_{}'.format(space, self.spin, field, simidx) fn_otherspace = '' fn_otherspacespin = '' if self.space == 'alm': fn_otherspace = 'obs_space{}_spin{}_field{}_{}'.format('alm', 0, field, simidx) elif self.space == 'map': fn_otherspace = 'obs_space{}_spin{}_field{}_{}'.format('map', spin, field, simidx) if self.space == 'alm': fn_otherspacespin = 'obs_space{}_spin{}_field{}_{}'.format('alm', 0, field, simidx) elif self.space == 'map': fn_otherspacespin = 'obs_space{}_spin{}_field{}_{}'.format('map', self.spin, field, simidx) if not self.cacher.is_cached(fn) and not self.cacher.is_cached(fn_otherspin) and not self.cacher.is_cached(fn_otherspacespin) and not self.cacher.is_cached(fn_otherspace): log.info('..nothing cached..') if self.libdir == DNaV: # sky data comes from len_lib, and we add noise log.info('.., generating.') obs = self.sky2obs( np.copy(self.len_lib.get_sim_sky(simidx, spin=spin, space=space, field=field)), np.copy(self.noise_lib.get_sim_noise(simidx, spin=spin, field=field, space=space)), spin=spin, space=space, field=field) elif self.libdir != DNaV: # observed data is somewhere log.info('.., but stored on disk.') if field == 'polarization': if self.spin == 2: obs1 = load_file(opj(self.libdir, self.fns['Q'].format(simidx))) obs2 = load_file(opj(self.libdir, self.fns['U'].format(simidx))) elif self.spin == 0: obs1 = load_file(opj(self.libdir, self.fns['E'].format(simidx))) obs2 = load_file(opj(self.libdir, self.fns['B'].format(simidx))) obs = np.array([obs1, obs2]) if self.space == 'map': if space == 'map': if self.spin != spin: if self.spin == 0: alm_buffer1 = self.geom_lib.map2alm(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) alm_buffer2 = self.geom_lib.map2alm(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = self.geom_lib.alm2map_spin([alm_buffer1,alm_buffer2], lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif self.spin == 2: alm_buffer = self.geom_lib.map2alm_spin(obs, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) obs1 = self.geom_lib.alm2map(alm_buffer[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.alm2map(alm_buffer[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) elif space == 'alm': if self.spin == 0: obs1 = self.geom_lib.map2alm(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.map2alm(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) else: obs = self.geom_lib.map2alm_spin(obs, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.space == 'alm': if space == 'map': if spin == 0: obs1 = self.geom_lib.alm2map(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.alm2map(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) else: obs = self.geom_lib.alm2map_spin(obs, lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif field == 'temperature': obs = np.array(load_file(opj(self.libdir, self.fns['T'].format(simidx)))) if self.space == 'map': if space == 'alm': obs = self.geom_lib.map2alm(obs, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.space == 'alm': if space == 'map': obs = self.geom_lib.alm2map(obs, lmax=self.lmax, mmax=self.lmax, nthreads=4) self.cacher.cache(fn, obs) self.cacher.cache(fn, obs) elif self.cacher.is_cached(fn): log.info('found "{}"'.format(fn)) pass elif self.cacher.is_cached(fn_otherspin): log.info('found "{}"'.format(fn_otherspin)) obs = np.array(self.cacher.load(fn_otherspin)) if space == 'map': if self.spin == 2: obs1 = self.geom_lib.map2alm(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.map2alm(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) obs = self.geom_lib.alm2map_spin(obs, lmax=self.lmax, spin=self.spin, mmax=self.lmax, nthreads=4) else: obs = self.geom_lib.map2alm_spin(obs, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) obs1 = self.geom_lib.alm2map(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.alm2map(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) self.cacher.cache(fn, obs) elif self.cacher.is_cached(fn_otherspace): log.info('found "{}"'.format(fn_otherspace)) obs = np.array(self.cacher.load(fn_otherspace)) if field == 'polarization': if self.space == 'alm': if spin == 0: obs1 = self.geom_lib.alm2map(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs2 = self.geom_lib.alm2map(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([obs1, obs2]) elif spin == 2: obs = self.geom_lib.alm2map_spin(obs, lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif self.space == 'map': if self.spin == 0: alm_buffer1 = self.geom_lib.map2alm(obs[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) alm_buffer2 = self.geom_lib.map2alm(obs[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) obs = np.array([alm_buffer1, alm_buffer2]) elif self.spin == 2: obs = self.geom_lib.map2alm_spin(obs, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif field == 'temperature': if self.space == 'alm': obs = self.geom_lib.alm2map(obs, lmax=self.lmax, mmax=self.lmax, nthreads=4) elif self.space == 'map': obs = self.geom_lib.map2alm(obs, lmax=self.lmax, mmax=self.lmax, nthreads=4) self.cacher.cache(fn, obs) elif self.cacher.is_cached(fn_otherspacespin): log.info('found "{}"'.format(fn_otherspacespin)) obs = np.array(self.cacher.load(fn_otherspacespin)) if self.space == 'alm': obs = self.geom_lib.alm2map_spin(obs, lmax=self.lmax, spin=spin, mmax=self.lmax, nthreads=4) elif self.space == 'map': obs = self.geom_lib.map2alm_spin(obs, spin=self.spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) self.cacher.cache(fn, obs) return self.cacher.load(fn)
def sky2obs(self, sky, noise, spin, space, field): if field == 'polarization': if space == 'map': if spin == 0: sky1 = self.geom_lib.map2alm(sky[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.map2alm(sky[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) elif spin == 2: sky = self.geom_lib.map2alm_spin(sky, spin=spin, lmax=self.lmax, mmax=self.lmax, nthreads=4) hp.almxfl(sky[0], self.transfunction, inplace=True) hp.almxfl(sky[1], self.transfunction, inplace=True) if space == 'map': if spin == 0: sky1 = self.geom_lib.alm2map(sky[0], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky2 = self.geom_lib.alm2map(sky[1], lmax=self.lmax, mmax=self.lmax, nthreads=4) sky = np.array([sky1, sky2]) elif spin == 2: sky = np.array(self.geom_lib.alm2map_spin(sky, spin=spin, lmax=self.lmax, mmax=self.lmax, nthreads=4)) return sky + noise else: return sky + noise elif field == 'temperature': if space == 'map': sky = self.geom_lib.map2alm(sky, lmax=self.lmax, mmax=self.lmax, nthreads=4) hp.almxfl(sky, self.transfunction, inplace=True) if space == 'map': return np.array(self.geom_lib.alm2map(sky, lmax=self.lmax, mmax=self.lmax, nthreads=4)) + noise else: return sky + noise def get_sim_noise(self, simidx, space, field, spin=2): return self.noise_lib.get_sim_noise(simidx, spin=spin, space=space, field=field)
[docs]class Simhandler: """Entry point for data handling and generating simulations. Data can be cl, unl, len, or obs, .. and alms or maps. Simhandler connects the individual libraries and decides what can be generated. E.g.: If obs data provided, len data cannot be generated. This structure makes sure we don't "hallucinate" data """ def __init__(self, flavour, space, geominfo=DNaV, maps=DNaV, field=DNaV, cls_lib=DNaV, unl_lib=DNaV, len_lib=DNaV, obs_lib=DNaV, noise_lib=DNaV, libdir=DNaV, libdir_noise=DNaV, libdir_phi=DNaV, fns=DNaV, fnsnoise=DNaV, fnsP=DNaV, lmax=DNaV, transfunction=DNaV, nlev=DNaV, spin=0, CMB_fn=DNaV, phi_fn=DNaV, phi_field=DNaV, phi_space=DNaV, epsilon=1e-7, phi_lmax=DNaV, libdir_suffix=DNaV, lenjob_geominfo=DNaV, cacher=cachers.cacher_mem(safe=True)): """Entry point for simulation data handling. Simhandler() connects the individual librariers together accordingly, depending on the provided data. It never stores data on disk itself, only in memory. It never 'hallucinates' data, i.e. if obs data provided, it will not generate len data. Args: flavour (str): Can be in ['obs', 'sky', 'unl'] and defines the type of data provided. space (str): Can be in ['map', 'alm', 'cl'] and defines the space of the data provided. maps (np.array, optional): These maps will be put into the cacher directly. They are used for settings in which no data is generated or accesed on disk, but directly provided (like in `delensalot.anafast()`) Defaults to DNaV. geominfo (tuple, optional): Lenspyx geominfo descriptor, describes the geominfo of the data provided (e.g. `('healpix', 'nside': 2048)). Defaults to DNaV. field (str, optional): the type of data provided, can be in ['temperature', 'polarization']. Defaults to DNaV. libdir (str, optional): directory of the data provided. Defaults to DNaV. libdir_noise (str, optional): directory of the noise provided. Defaults to DNaV. libdir_phi (str, optional): directory of the lensing potential provided. Defaults to DNaV. fns (dict with str with formatter, optional): file names of the data provided. It expects `{'T': <filename{simidx}.something>, 'Q': <filename{simidx}.something>, 'U': <filename{simidx}.something>}`, where `{simidx}` is used by the libraries to format the simulation index into the name. Defaults to DNaV. fnsnoise (dict with str with formatter, optional): file names of the noise provided. It expects `{'T': <filename{simidx}.something>, 'Q': <filename{simidx}.something>, 'U': <filename{simidx}.something>}`, where `{simidx}` is used by the libraries to format the simulation index into the name. Defaults to DNaV. fnsP (str with formatter, optional): file names of the lensing potential provided. It expects `<filename{simidx}.something>, where `{simidx}` is used by the libraries to format the simulation index into the name. Defaults to DNaV. lmax (int, optional): Maximum l of the data provided. Defaults to DNaV. transfunction(np.array, optional): transfer function. Defaults to DNaV. nlev (dict, optional): noise level of the individual fields. It expects `{'T': <value>, 'P': <value>}. Defaults to DNaV. spin (int, optional): the spin of the data provided. Defaults to 0. Always defaults to 0 for temperature. CMB_fn (str, optional): path+name of the file of the power spectra of the CMB. Defaults to DNaV. phi_fn (str, optional): path+name of the file of the power spectrum of the lensing potential. Defaults to DNaV. phi_field (str, optional): the type of potential provided, can be in ['potential', 'deflection', 'convergence']. This simulation library will automatically rescale the field, if needded. Defaults to DNaV. phi_space (str, optional): can be in ['map', 'alm', 'cl'] and defines the space of the lensing potential provided.. Defaults to DNaV. phi_lmax (_type_, optional): the maximum multipole of the lensing potential. if simulation library perfroms lensing, it is advisable that `phi_lmax` is somewhat larger than `lmax` (+ ~512-1024). Defaults to DNaV. epsilon (float, optional): Lenspyx lensing accuracy. Defaults to 1e-7. """ self.spin = spin self.lmax = lmax self.phi_lmax = phi_lmax self.flavour = flavour self.space = space self.nlev = nlev self.maps = maps self.transfunction = transfunction if space == 'map': if flavour == 'obs': if np.all(maps == DNaV): assert libdir != DNaV, "need to provide libdir" assert fns != DNaV, 'you need to provide fns' assert lmax != DNaV, "need to provide lmax" assert spin != DNaV, "need to provide spin" else: assert spin != DNaV, "need to provide spin" assert lmax != DNaV, "need to provide lmax" assert field != DNaV, "need to provide field" self.obs_lib = Xobs(maps=maps, space=space, transfunction=transfunction, lmax=lmax, libdir=libdir, fns=fns, spin=spin, geominfo=geominfo, field=field, libdir_suffix=libdir_suffix) if obs_lib == DNaV else obs_lib self.noise_lib = self.obs_lib.noise_lib self.libdir = self.obs_lib.libdir self.fns = self.obs_lib.fns if flavour == 'sky': assert libdir != DNaV, "need to provide libdir" assert fns != DNaV, 'you need to provide fns' assert lmax != DNaV, "need to provide lmax" assert spin != DNaV, "need to provide spin" assert nlev != DNaV, "need to provide nlev" assert np.all(transfunction != DNaV), "need to provide transfunction" self.len_lib = Xsky(unl_lib=unl_lib, lmax=lmax, libdir=libdir, fns=fns, space=space, spin=spin, epsilon=epsilon, geominfo=geominfo, lenjob_geominfo=lenjob_geominfo) if len_lib == DNaV else len_lib self.obs_lib = Xobs(len_lib=self.len_lib, space=space, transfunction=transfunction, lmax=lmax, nlev=nlev, noise_lib=noise_lib, libdir_noise=libdir_noise, fnsnoise=fnsnoise, geominfo=geominfo, libdir_suffix=libdir_suffix) self.noise_lib = self.obs_lib.noise_lib self.libdir = self.len_lib.libdir self.fns = self.len_lib.fns if flavour == 'unl': assert libdir != DNaV, "need to provide libdir" assert fns != DNaV, 'you need to provide fns' assert lmax != DNaV, "need to provide lmax" assert spin != DNaV, "need to provide spin" assert nlev != DNaV, "need to provide nlev" assert np.all(transfunction != DNaV), "need to provide transfunction" if libdir_phi != DNaV: assert phi_field != DNaV, "need to provide phi_field" assert fnsP != DNaV, "need to provide fnsP" assert phi_lmax != DNaV, "need to provide phi_lmax" assert phi_space != DNaV, "need to provide phi_space" self.unl_lib = Xunl(lmax=lmax, libdir=libdir, fns=fns, fnsP=fnsP, phi_field=phi_field, libdir_phi=libdir_phi, space=space, phi_space=phi_space, phi_lmax=phi_lmax, geominfo=geominfo, spin=spin) if unl_lib == DNaV else unl_lib elif libdir_phi == DNaV: assert phi_fn != DNaV, "need to provide phi_fn" assert phi_lmax != DNaV, "need to provide phi_lmax" assert phi_field != DNaV, "need to provide phi_field" assert phi_space == 'cl', "please set phi_space='cl', just to be sure." self.cls_lib = Cls(phi_lmax=phi_lmax, phi_fn=phi_fn, phi_field=phi_field) self.unl_lib = Xunl(cls_lib=self.cls_lib, lmax=lmax, libdir=libdir, fns=fns, phi_field=phi_field, space=space, phi_space=phi_space, phi_lmax=phi_lmax, geominfo=geominfo, spin=spin) if unl_lib == DNaV else unl_lib self.len_lib = Xsky(unl_lib=self.unl_lib, lmax=lmax, space=space, epsilon=epsilon, geominfo=geominfo, lenjob_geominfo=lenjob_geominfo) self.obs_lib = Xobs(len_lib=self.len_lib, transfunction=transfunction, lmax=lmax, nlev=nlev, noise_lib=noise_lib, libdir_noise=libdir_noise, fnsnoise=fnsnoise, space=space, geominfo=geominfo, libdir_suffix=libdir_suffix) self.noise_lib = self.obs_lib.noise_lib self.libdir = self.unl_lib.libdir self.fns = self.unl_lib.fns elif space in ['alm']: self.spin = 0 # there are genrally no qlms, ulms, therefore here we can safely assume that data is spin0 spin = 0 if flavour == 'obs': if libdir != DNaV: self.libdir = libdir if fns == DNaV: assert 0, 'you need to provide fns' self.fns = fns self.obs_lib = Xobs(maps=maps, space=space, transfunction=transfunction, lmax=lmax, libdir=libdir, fns=fns, spin=self.spin, geominfo=geominfo, libdir_suffix=libdir_suffix) if obs_lib == DNaV else obs_lib self.noise_lib = self.obs_lib.noise_lib self.libdir = self.obs_lib.libdir self.fns = self.obs_lib.fns if flavour == 'sky': assert 0, 'implement if needed' if flavour == 'unl': if (libdir_phi == DNaV or libdir == DNaV) and cls_lib == DNaV: cls_lib = Cls(lmax=lmax, CMB_fn=CMB_fn, phi_fn=phi_fn, phi_field=phi_field) self.cls_lib = cls_lib # just to be safe.. self.unl_lib = Xunl(lmax=lmax, libdir=libdir, fns=fns, fnsP=fnsP, phi_field=phi_field, libdir_phi=libdir_phi, space=space, phi_space=phi_space, cls_lib=cls_lib, geominfo=geominfo, spin=self.spin) if unl_lib == DNaV else unl_lib self.len_lib = Xsky(unl_lib=self.unl_lib, lmax=lmax, space=space, epsilon=epsilon, geominfo=geominfo, lenjob_geominfo=lenjob_geominfo) self.obs_lib = Xobs(len_lib=self.len_lib, transfunction=transfunction, lmax=lmax, nlev=nlev, noise_lib=noise_lib, libdir_noise=libdir_noise, fnsnoise=fnsnoise, space=space, spin=self.spin, geominfo=geominfo, libdir_suffix=libdir_suffix) self.noise_lib = self.obs_lib.noise_lib self.libdir = self.unl_lib.libdir self.fns = self.unl_lib.fns elif space == 'cl': self.spin = 0 # there are genrally no qcls, ucls, therefore here we can safely assume that data is spin0 spin = 0 if flavour == 'obs': assert 0, 'implement if needed' # unlikely this will ever be needed if flavour == 'sky': assert 0, 'implement if needed' # unlikely this will ever be needed if flavour == 'unl': assert np.all(transfunction != DNaV), "need to provide transfunction" assert lmax != DNaV, "need to provide lmax" assert nlev != DNaV, "need to provide nlev" assert phi_lmax != DNaV, "need to provide phi_lmax" if phi_fn != DNaV: assert phi_field != DNaV, "need to provide phi_field" self.cls_lib = Cls(lmax=lmax, phi_lmax=phi_lmax, CMB_fn=CMB_fn, phi_fn=phi_fn, phi_field=phi_field) self.unl_lib = Xunl(cls_lib=self.cls_lib, lmax=lmax, fnsP=fnsP, phi_field=phi_field, libdir_phi=libdir_phi, phi_space=phi_space, geominfo=geominfo) self.len_lib = Xsky(unl_lib=self.unl_lib, lmax=lmax, epsilon=epsilon, geominfo=geominfo, lenjob_geominfo=lenjob_geominfo) self.obs_lib = Xobs(len_lib=self.len_lib, transfunction=transfunction, lmax=lmax, nlev=nlev, noise_lib=noise_lib, libdir_noise=libdir_noise, fnsnoise=fnsnoise, geominfo=geominfo, cacher=cacher, libdir_suffix=libdir_suffix) self.noise_lib = self.obs_lib.noise_lib self.libdir = DNaV # settings this here explicit for a future me, so I see it easier self.fns = DNaV # settings this here explicit for a future me, so I see it easier self.geominfo = self.obs_lib.geominfo # Sim_generator() needs this. I let obs_lib decide the final geominfo. def get_sim_sky(self, simidx, space, field, spin): return self.len_lib.get_sim_sky(simidx=simidx, space=space, field=field, spin=spin) def get_sim_unl(self, simidx, space, field, spin): return self.unl_lib.get_sim_unl(simidx=simidx, space=space, field=field, spin=spin) def get_sim_obs(self, simidx, space, field, spin): return self.obs_lib.get_sim_obs(simidx=simidx, space=space, field=field, spin=spin) def get_sim_noise(self, simidx, space, field, spin=2): return self.noise_lib.get_sim_noise(simidx, spin=spin, space=space, field=field) def get_sim_phi(self, simidx, space): return self.unl_lib.get_sim_phi(simidx=simidx, space=space) def purgecache(self): print('sims_lib: purging cachers to release memory') libs = ['obs_lib', 'noise_lib', 'unl_lib', 'len_lib'] for lib in libs: if lib in self.__dict__: if len(list(self.obs_lib.cacher._cache.keys())) > 0: for key in np.copy(list(self.obs_lib.cacher._cache.keys())): self.obs_lib.cacher.remove(key) def isdone(self, simidx, field, spin, space='map', flavour='obs'): fn = '{}_space{}_spin{}_field{}_{}'.format(flavour, space, spin, field, simidx) if self.obs_lib.cacher.is_cached(fn): return True if field == 'polarization': # print(opj(self.libdir, self.fns['Q'].format(simidx))) # print(opj(self.libdir, self.fns['U'].format(simidx))) if self.libdir != DNaV and self.fns != DNaV: if os.path.exists(opj(self.libdir, self.fns['Q'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['U'].format(simidx))): return True if field == 'temperature': # print(opj(self.libdir, self.fns['T'].format(simidx))) if self.libdir != DNaV and self.fns != DNaV: if os.path.exists(opj(self.libdir, self.fns['T'].format(simidx))): return True return False # compatibility with Plancklens def hashdict(self): return {} # compatibility with Plancklens def get_sim_tmap(self, simidx): return self.get_sim_obs(simidx=simidx, space='map', field='temperature', spin=0) # compatibility with Plancklens def get_sim_pmap(self, simidx): return self.get_sim_obs(simidx=simidx, space='map', field='polarization', spin=2)
class anafast_clone(): def __init__(self, geominfo): self.nside = geominfo[1]['nside'] self.npixel = hp.nside2npix(self.nside) def map2alm(self, map, lmax, mmax, nthreads): return hp.map2alm(map, lmax=lmax) def map2alm_spin(self, map, spin, lmax, mmax, nthreads): return hp.map2alm_spin(map, lmax=lmax, spin=spin) def alm2map(self, alm, lmax, mmax, nthreads): return hp.alm2map(alm, lmax=lmax, nside=self.nside) def alm2map_spin(self, alm, spin, lmax, mmax, nthreads): return hp.alm2map_spin(alm, lmax=lmax, spin=spin, nside=self.nside) def npix(self): return self.npixel def get_geom(geominfo): return lp_get_geom(geominfo) # return anafast_clone(geominfo)