Source code for delensalot.core.handler

#!/usr/bin/env python

"""handler.py: This module collects the delensalot jobs. It receives the delensalot model build for the respective job. They all initialize needed modules and directories, collect the computing-jobs, and run the computing-jobs, with MPI support, if available.
    
"""
import os
from os.path import join as opj
import hashlib

import numpy as np
import healpy as hp
import hashlib

import logging
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

from logdecorator import log_on_start, log_on_end
import datetime, getpass, copy, importlib

from plancklens import qresp, qest, qecl, utils
from plancklens.qcinv import opfilt_pp
from plancklens.filt import filt_util, filt_cinv, filt_simple

from delensalot.utils import read_map
from delensalot.utility import utils_qe, utils_sims
from delensalot.utility.utils_hp import Alm, almxfl, alm_copy, gauss_beam

from delensalot.config.visitor import transform, transform3d
from delensalot.config.config_helper import data_functions as df
from delensalot.config.metamodel import DEFAULT_NotAValue
from delensalot.config.metamodel.dlensalot_mm import DLENSALOT_Concept

from delensalot.core import mpi
from delensalot.core.mpi import check_MPI
from delensalot.core.opfilt.opfilt_handler import QE_transformer, MAP_transformer
from delensalot.core.iterator.iteration_handler import iterator_transformer
from delensalot.core.iterator.statics import rec as rec
from delensalot.core.decorator.exception_handler import base as base_exception_handler
from delensalot.core.opfilt import utils_cinv_p as cinv_p_OBD
from delensalot.core.opfilt.bmodes_ninv import template_bfilt

def get_dirname(s):
    return s.replace('(', '').replace(')', '').replace('{', '').replace('}', '').replace(' ', '').replace('\'', '').replace('\"', '').replace(':', '_').replace(',', '_').replace('[', '').replace(']', '')

def dict2roundeddict(d):
    s = ''
    for k,v in d.items():
        d[k] = np.around(v,3)
    return d
class Basejob():
    """
    Base class for all jobs, i.e. convenience functions go in here as they should be accessible from anywhere
    """
    def __str__(self):
        _str = ''
        for key, val in self.__dict__.items():
            keylen = len(str(key))
            if type(val) in [list, np.ndarray, np.array, dict]:
                _str += '{}:'.format(key)+(20-keylen)*' '+'\t{}'.format(type(val))
            else:
                _str += '{}:'.format(key)+(20-keylen)*' '+'\t{}'.format(val)
            _str += '\n'
        return _str


    def __init__(self, model):
        self.__dict__.update(model.__dict__)

        self.libdir_QE = opj(self.TEMP, 'QE')
        if not os.path.exists(self.libdir_QE):
            os.makedirs(self.libdir_QE)
        self.libdir_MAP = lambda qe_key, simidx, version: opj(self.TEMP, 'MAP/%s'%(qe_key), 'sim%04d'%(simidx) + version)
        for simidx in self.simidxs:
            libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version)
            if not os.path.exists(libdir_MAPidx):
                os.makedirs(libdir_MAPidx)
            self.libdir_blt = opj(libdir_MAPidx, 'BLT/')
            if not os.path.exists(self.libdir_blt):
                os.makedirs(self.libdir_blt)
         
        self.config_model = model
        self.jobs = []


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished: jobs={self.jobs}")
    def collect_jobs(self):

        assert 0, "Overwrite"


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def run(self):

        assert 0, "Implement if needed"


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_qlm_it(self, simidx, it):

        assert 0, "Implement if needed"


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_plm_it(self, simidx, its):

        plms = rec.load_plms(self.libdir_MAP(self.k, simidx, self.version), its)

        return plms


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_mf_it(self, simidx, it, normalized=True):

        assert 0, "Implement if needed"


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_blt_it(self, simidx, it):
        if self.data_from_CFS:
            # TODO probably enough to just check if libdir_blt_MAP_CFS is empty
            assert 0, 'implement if needed'
            fn_blt = self.libdir_blt_MAP_CFS(self.k, simidx, self.version)
        else:
            if it == 0:
                fn_blt = opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, 0, 0, self.lm_max_blt[0]) + 'perturbative' * self.blt_pert + '.npy')
            elif it >0:
                fn_blt = opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, it, it, self.lm_max_blt[0]) + '.npy')
        return np.load(fn_blt)
    

    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_ivf(self, simidx, it, field):

        assert 0, "Implement if needed"


    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_wf(self, simidx, it, field):

        assert 0, "Implement if needed"
    

    # @base_exception_handler
    #@log_on_start(logging.INFO, "collect_jobs() started")
    #@log_on_end(logging.INFO, "collect_jobs() finished")
    def get_fiducial_sim(self, simidx, field):
        """_summary_
        """   
        assert 0, "Implement if needed"


    #@log_on_start(logging.INFO, "get_filter() started")
    #@log_on_end(logging.INFO, "get_filter() finished")
    def get_filter(self): 
        assert 0, 'overwrite'


[docs]class OBD_builder(Basejob): """OBD matrix builder Job. Calculates the OBD matrix, used to correctly deproject the B-modes at a masked sky. """ @check_MPI def __init__(self, OBD_model, diasable_mpi=False): self.__dict__.update(OBD_model.__dict__) b_transf = gauss_beam(df.a2r(self.beam), lmax=self.lmax) self.nivp = np.array(opfilt_pp.alm_filter_ninv(self.nivp_desc, b_transf, marge_qmaps=(), marge_umaps=()).get_ninv()) # @base_exception_handler #@log_on_start(logging.INFO, "collect_jobs() started") #@log_on_end(logging.INFO, "collect_jobs() finished") def collect_jobs(self): # This fakes the collect/run structure, as bpl takes care of MPI jobs = [1] self.jobs = jobs return jobs # @base_exception_handler #@log_on_start(logging.INFO, "run() started") #@log_on_end(logging.INFO, "run() finished") def run(self): # This fakes the collect/run structure, as bpl takes care of MPI for job in self.jobs: bpl = template_bfilt(self.lmin_b, self.nivjob_geomlib, self.tr, _lib_dir=self.libdir) if not os.path.exists(self.libdir+ '/tnit.npy'): bpl._get_rows_mpi(self.nivp, prefix='') mpi.barrier() if mpi.rank == 0: if not os.path.exists(self.libdir+ '/tnit.npy'): tnit = bpl._build_tnit() np.save(self.libdir+ '/tnit.npy', tnit) else: tnit = np.load(self.libdir+ '/tnit.npy') if not os.path.exists(self.libdir+ '/tniti.npy'): log.info('inverting') tniti = np.linalg.inv(tnit + np.diag((1. / (self.nlev_dep / 180. / 60. * np.pi) ** 2) * np.ones(tnit.shape[0]))) np.save(self.libdir+ '/tniti.npy', tniti) readme = '{}: tniti.npy. created from user {} using lerepi/delensalot with the following settings: {}'.format(getpass.getuser(), datetime.date.today(), self.__dict__) with open(self.libdir+ '/README.txt', 'w') as f: f.write(readme) else: log.info('Matrix already created') mpi.barrier()
[docs]class Sim_generator(Basejob): """Simulation generation Job. Generates simulations for the requested configuration. * If any libdir exists, then a flavour of data is provided. Therefore, can only check by making sure flavour == obs, and fns exist. """ def __init__(self, dlensalot_model): """ In this init we make the following decision: * (1) Does user provide obs data? Then Sim_generator can be fully skipped * (2) Otherwise, check if files already generated (of course, delensalot model may not know this), * If so, update the simhandler * If not, * generate the simulations * update the simhandler """ super().__init__(dlensalot_model) if self.simulationdata.flavour == 'obs' or np.all(self.simulationdata.obs_lib.maps != DEFAULT_NotAValue): # (1) # Here, obs data is provided and nothing needs to be generated if np.all(self.simulationdata.obs_lib.maps != DEFAULT_NotAValue): log.info('Will use data provided in memory') else: log.info('Will use obs data stored at {} with filenames {}'.format(self.simulationdata.libdir, str(self.simulationdata.fns))) else: if self.simulationdata.flavour == 'sky': # Here, sky data is provided and obs needs to be generated self.libdir_sky = self.simulationdata.libdir self.fns_sky = self.simulationdata.fns lenjob_geomstr = 'unknown_lensinggeometry' else: hlib = hashlib.sha256() hlib.update(str(self.simulationdata.transfunction).encode()) transfunctioncode = hlib.hexdigest()[:4] # some flavour provided, and we need to generate the sky and obs maps from this. lenjob_geomstr = get_dirname(str(self.simulationdata.len_lib.lenjob_geominfo)) self.libdir_suffix = 'generic' if self.libdir_suffix == '' else self.libdir_suffix self.libdir_sky = opj(os.environ['SCRATCH'], 'simulation/', self.libdir_suffix, get_dirname(str(self.simulationdata.geominfo)), lenjob_geomstr) self.fns_sky = self.set_basename_sky() self.fnsP = 'philm_{}.npy' self.libdir_suffix = 'generic' if self.libdir_suffix == '' else self.libdir_suffix nlev_round = dict2roundeddict(self.simulationdata.nlev) self.libdir = opj(os.environ['SCRATCH'], 'simulation/', self.libdir_suffix, get_dirname(str(self.simulationdata.geominfo)), get_dirname(lenjob_geomstr), get_dirname(str(sorted(nlev_round.items()))), '{}'.format(transfunctioncode)) # self.fns = self.set_basename_obs() first_rank = mpi.bcast(mpi.rank) if first_rank == mpi.rank: if not os.path.exists(self.libdir): os.makedirs(self.libdir) for n in range(mpi.size): if n != mpi.rank: mpi.send(1, dest=n) else: mpi.receive(None, source=mpi.ANY_SOURCE) simidxs_ = np.array(list(set(np.concatenate([self.simidxs, self.simidxs_mf]))), dtype=int) check_ = True for simidx in simidxs_: # (2) if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: if not (os.path.exists(opj(self.libdir, self.fns['E'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['B'].format(simidx)))): check_ = False break elif self.k in ['ptt']: if not os.path.exists(opj(self.libdir, self.fns['T'].format(simidx))): check_ = False break elif self.k in ['p']: if not (os.path.exists(opj(self.libdir, self.fns['T'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['E'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['B'].format(simidx)))): check_ = False break if check_: # (3) self.postrun_obs() log.info('Will use obs data at {} with filenames {}'.format(self.libdir, str(self.fns))) else: log.info('obs data will be stored at {} with filenames {}'.format(self.libdir, str(self.fns))) if self.simulationdata.flavour != 'sky': # if data is below sky, I need to check. otherwise we know they exist check_ = True for simidx in simidxs_: # (2) if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: if not (os.path.exists(opj(self.libdir_sky, self.fns_sky['E'].format(simidx))) and os.path.exists(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)))): check_ = False break elif self.k in ['ptt']: if not os.path.exists(opj(self.libdir_sky, self.fns_sky['T'].format(simidx))): check_ = False break elif self.k in ['p']: if not (os.path.exists(opj(self.libdir_sky, self.fns_sky['T'].format(simidx))) and os.path.exists(opj(self.libdir_sky, self.fns_sky['E'].format(simidx))) and os.path.exists(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)))): check_ = False break if check_: # (3) self.postrun_sky() log.info('Will use sky data at {} with filenames {}'.format(self.libdir_sky, str(self.fns_sky))) else: log.info('sky data will be stored at {} with filenames {}'.format(self.libdir_sky, str(self.fns_sky))) def set_basename_sky(self): if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: fns_sky = {'E': 'Ealmsky_{}.npy', 'B': 'Balmsky_{}.npy'} elif self.k in ['ptt']: fns_sky = {'T': 'Talmsky_{}.npy'} elif self.k in ['p']: fns_sky = {'T': 'Talmsky_{}.npy', 'E': 'Ealmsky_{}.npy', 'B': 'Balmsky_{}.npy'} return fns_sky def set_basename_obs(self): if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: fns = {'E': 'Ealmobs_{}.npy', 'B': 'Balmobs_{}.npy'} elif self.k in ['ptt']: fns = {'T': 'Talmobs_{}.npy'} elif self.k in ['p']: fns = {'T': 'Talmobs_{}.npy', 'E': 'Ealmobs_{}.npy', 'B': 'Balmobs_{}.npy'} return fns # @base_exception_handler @log_on_start(logging.INFO, "Sim.collect_jobs() started") @log_on_end(logging.INFO, "Sim.collect_jobs() finished: jobs={self.jobs}") def collect_jobs(self): jobs = list(range(len(['generate_sky', 'generate_obs']))) if np.all(self.simulationdata.maps == DEFAULT_NotAValue) and self.simulationdata.flavour != 'obs': for taski, task in enumerate(['generate_sky', 'generate_obs']): _jobs = [] simidxs_ = np.array(list(set(np.concatenate([self.simidxs, self.simidxs_mf]))), dtype=int) # print(self.simidxs, self.simidxs.dtype, self.simidxs_mf, simidxs_) if task == 'generate_sky': for simidx in simidxs_: if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: fnQ = opj(self.libdir_sky, self.fns_sky['E'].format(simidx)) fnU = opj(self.libdir_sky, self.fns_sky['B'].format(simidx)) if not os.path.isfile(fnQ) or not os.path.isfile(fnU) or not os.path.exists(opj(self.libdir_sky, self.fnsP.format(simidx))): _jobs.append(simidx) elif self.k in ['ptt']: fnT = opj(self.libdir_sky, self.fns_sky['T'].format(simidx)) if not os.path.isfile(fnT) or not os.path.exists(opj(self.libdir_sky, self.fnsP.format(simidx))): _jobs.append(simidx) elif self.k in ['p']: fnT = opj(self.libdir_sky, self.fns_sky['T'].format(simidx)) fnQ = opj(self.libdir_sky, self.fns_sky['E'].format(simidx)) fnU = opj(self.libdir_sky, self.fns_sky['B'].format(simidx)) if not os.path.isfile(fnT) or not os.path.isfile(fnQ) or not os.path.isfile(fnU) or not os.path.exists(opj(self.libdir_sky, self.fnsP.format(simidx))): _jobs.append(simidx) if task == 'generate_obs': for simidx in simidxs_: if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: fnQ = opj(self.libdir, self.fns['E'].format(simidx)) fnU = opj(self.libdir, self.fns['B'].format(simidx)) if not os.path.isfile(fnQ) or not os.path.isfile(fnU): _jobs.append(simidx) elif self.k in ['ptt']: fnT = opj(self.libdir, self.fns['T'].format(simidx)) if not os.path.isfile(fnT): _jobs.append(simidx) elif self.k in ['p']: fnT = opj(self.libdir, self.fns['T'].format(simidx)) fnQ = opj(self.libdir, self.fns['E'].format(simidx)) fnU = opj(self.libdir, self.fns['B'].format(simidx)) if not os.path.isfile(fnT) or not os.path.isfile(fnQ) or not os.path.isfile(fnU): _jobs.append(simidx) jobs[taski] = _jobs self.jobs = jobs else: self.jobs = [[],[]] return self.jobs # @base_exception_handler @log_on_start(logging.INFO, "Sim.run() started") @log_on_end(logging.INFO, "Sim.run() finished") def run(self): for taski, task in enumerate(['generate_sky', 'generate_obs']): for simidx in self.jobs[taski][mpi.rank::mpi.size]: log.info("rank {} (size {}) generating sim {}".format(mpi.rank, mpi.size, simidx)) if task == 'generate_sky': self.generate_sky(simidx) if task == 'generate_obs': self.generate_obs(simidx) if self.simulationdata.obs_lib.maps == DEFAULT_NotAValue: self.simulationdata.purgecache() log.info("rank {} (size {}) generated sim {}".format(mpi.rank, mpi.size, simidx)) if np.all(self.simulationdata.maps == DEFAULT_NotAValue): self.postrun_sky() self.postrun_obs() #@log_on_start(logging.INFO, "Sim.generate_sim(simidx={simidx}) started") #@log_on_end(logging.INFO, "Sim.generate_sim(simidx={simidx}) finished") def generate_sky(self, simidx): if not os.path.exists(opj(self.libdir_sky, self.fnsP.format(simidx))): phi = self.simulationdata.get_sim_phi(simidx, space='alm') np.save(opj(self.libdir_sky, self.fnsP.format(simidx)), phi) if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: if not (os.path.exists(opj(self.libdir_sky, self.fns_sky['E'].format(simidx))) and os.path.exists(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)))): EBsky = self.simulationdata.get_sim_sky(simidx, spin=0, space='alm', field='polarization') np.save(opj(self.libdir_sky, self.fns_sky['E'].format(simidx)), EBsky[0]) np.save(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)), EBsky[1]) elif self.k in ['ptt']: if not os.path.exists(opj(self.libdir_sky, self.fns_sky['T'].format(simidx))): Tsky = self.simulationdata.get_sim_sky(simidx, spin=0, space='alm', field='temperature') np.save(opj(self.libdir_sky, self.fns_sky['T'].format(simidx)), Tsky) elif self.k in ['p']: if not (os.path.exists(opj(self.libdir_sky, self.fns_sky['E'].format(simidx))) and os.path.exists(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)))): EBsky = self.simulationdata.get_sim_sky(simidx, spin=0, space='alm', field='polarization') np.save(opj(self.libdir_sky, self.fns_sky['E'].format(simidx)), EBsky[0]) np.save(opj(self.libdir_sky, self.fns_sky['B'].format(simidx)), EBsky[1]) if not os.path.exists(opj(self.libdir_sky, self.fns_sky['T'].format(simidx))): Tsky = self.simulationdata.get_sim_sky(simidx, spin=0, space='alm', field='temperature') np.save(opj(self.libdir_sky, self.fns_sky['T'].format(simidx)), Tsky) #@log_on_start(logging.INFO, "Sim.generate_sim(simidx={simidx}) started") #@log_on_end(logging.INFO, "Sim.generate_sim(simidx={simidx}) finished") def generate_obs(self, simidx): if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: if not (os.path.exists(opj(self.libdir, self.fns['E'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['B'].format(simidx)))): EBobs = self.simulationdata.get_sim_obs(simidx, spin=0, space='alm', field='polarization') np.save(opj(self.libdir, self.fns['E'].format(simidx)), EBobs[0]) np.save(opj(self.libdir, self.fns['B'].format(simidx)), EBobs[1]) elif self.k in ['ptt']: if not os.path.exists(opj(self.libdir, self.fns['T'].format(simidx))): Tobs = self.simulationdata.get_sim_obs(simidx, spin=0, space='alm', field='temperature') np.save(opj(self.libdir, self.fns['T'].format(simidx)), Tobs) elif self.k in ['p']: if not (os.path.exists(opj(self.libdir, self.fns['E'].format(simidx))) and os.path.exists(opj(self.libdir, self.fns['B'].format(simidx)))): EBobs = self.simulationdata.get_sim_obs(simidx, spin=0, space='alm', field='polarization') np.save(opj(self.libdir, self.fns['E'].format(simidx)), EBobs[0]) np.save(opj(self.libdir, self.fns['B'].format(simidx)), EBobs[1]) if not os.path.exists(opj(self.libdir, self.fns['T'].format(simidx))): Tobs = self.simulationdata.get_sim_obs(simidx, spin=0, space='alm', field='temperature') np.save(opj(self.libdir, self.fns['T'].format(simidx)), Tobs) def postrun_obs(self): # we always enter postrun, even from other jobs (like QE_lensrec). So making sure we are not accidently overwriting libdirs and fns if self.simulationdata.flavour != 'sky' and self.simulationdata.flavour != 'obs' and np.all(self.simulationdata.obs_lib.maps == DEFAULT_NotAValue): self.simulationdata.libdir = self.libdir self.simulationdata.fns = self.fns if self.simulationdata.flavour != 'obs': self.simulationdata.obs_lib.fns = self.fns self.simulationdata.obs_lib.libdir = self.libdir self.simulationdata.obs_lib.space = 'alm' self.simulationdata.obs_lib.spin = 0 def postrun_sky(self): # we always enter postrun, even from other jobs (like QE_lensrec). So making sure we are not accidently overwriting libdirs and fns if self.simulationdata.flavour != 'sky' and self.simulationdata.flavour != 'obs' and np.all(self.simulationdata.obs_lib.maps == DEFAULT_NotAValue): self.simulationdata.len_lib.fns = self.fns_sky self.simulationdata.len_lib.libdir = self.libdir_sky self.simulationdata.len_lib.space = 'alm' self.simulationdata.len_lib.spin = 0 self.simulationdata.unl_lib.libdir_phi = self.libdir_sky self.simulationdata.unl_lib.fnsP = self.fnsP self.simulationdata.unl_lib.phi_field = 'potential' self.simulationdata.unl_lib.phi_space = 'alm' # we always safe phi as lm's
[docs]class QE_lr(Basejob): """Quadratic estimate lensing reconstruction Job. Performs tasks such as lensing reconstruction, mean-field calculation, and B-lensing template calculation. """ @check_MPI def __init__(self, dlensalot_model, caller=None): if caller is not None: dlensalot_model.qe_tasks = dlensalot_model.it_tasks ## TODO. Current solution to fake an iteration handler for QE to calc blt is to initialize one MAP_job here. ## In the future, I want to remove get_template_blm from the iteration_handler for QE. if 'calc_blt' in dlensalot_model.qe_tasks: self.MAP_job = caller super().__init__(dlensalot_model) self.dlensalot_model = dlensalot_model self.simgen = Sim_generator(dlensalot_model) self.simulationdata = self.simgen.simulationdata # self.filter_ = transform(self.configfile.dlensalot_model, opfilt_handler_QE()) if self.qe_filter_directional == 'isotropic': self.ivfs = filt_simple.library_fullsky_sepTP(opj(self.libdir_QE, 'ivfs'), self.simulationdata, self.nivjob_geominfo[1]['nside'], self.ttebl, self.cls_len, self.ftebl_len['t'], self.ftebl_len['e'], self.ftebl_len['b'], cache=True) if self.qlm_type == 'sepTP': self.qlms_dd = qest.library_sepTP(opj(self.libdir_QE, 'qlms_dd'), self.ivfs, self.ivfs, self.cls_len['te'], self.nivjob_geominfo[1]['nside'], lmax_qlm=self.lm_max_qlm[0]) elif self.qe_filter_directional == 'anisotropic': ## Wait for finished run(), as plancklens triggers cinv_calc... # FIXME index must be the calc_phi-task index if len(self.collect_jobs()[0]) == 0: self.init_aniso_filter() self.mf = lambda simidx: self.get_meanfield(int(simidx)) self.plm = lambda simidx: self.get_plm(simidx, self.QE_subtract_meanfield) self.R_unl = lambda: qresp.get_response(self.k, self.lm_max_ivf[0], self.k[0], self.cls_unl, self.cls_unl, self.ftebl_unl, lmax_qlm=self.lm_max_qlm[0])[0] ## Faking here sims_MAP for calc_blt as it needs iteration_handler if 'calc_blt' in self.qe_tasks: if self.it_filter_directional == 'anisotropic': # TODO reimplement ztrunc self.sims_MAP = utils_sims.ztrunc_sims(self.simulationdata, self.nivjob_geominfo[1]['nside'], [self.zbounds]) elif self.it_filter_directional == 'isotropic': self.sims_MAP = self.simulationdata if self.cl_analysis == True: # TODO fix numbers for mc correction and total nsims self.ss_dict = { k : v for k, v in zip( np.concatenate( [ range(i*60, (i+1)*60) for i in range(0,5) ] ), np.concatenate( [ np.roll( range(i*60, (i+1)*60), -1 ) for i in range(0,5) ] ) ) } self.ds_dict = { k : -1 for k in range(300)} self.ivfs_d = filt_util.library_shuffle(self.ivfs, self.ds_dict) self.ivfs_s = filt_util.library_shuffle(self.ivfs, self.ss_dict) self.qlms_ds = qest.library_sepTP(opj(self.libdir_QE, 'qlms_ds'), self.ivfs, self.ivfs_d, self.cls_len['te'], self.nivjob_geominfo[1]['nside'], lmax_qlm=self.lm_max_qlm[0]) self.qlms_ss = qest.library_sepTP(opj(self.libdir_QE, 'qlms_ss'), self.ivfs, self.ivfs_s, self.cls_len['te'], self.nivjob_geominfo[1]['nside'], lmax_qlm=self.lm_max_qlm[0]) self.mc_sims_bias = np.arange(60, dtype=int) self.mc_sims_var = np.arange(60, 300, dtype=int) self.qcls_ds = qecl.library(opj(self.libdir_QE, 'qcls_ds'), self.qlms_ds, self.qlms_ds, np.array([])) # for QE RDN0 calculations self.qcls_ss = qecl.library(opj(self.libdir_QE, 'qcls_ss'), self.qlms_ss, self.qlms_ss, np.array([])) # for QE RDN0 / MCN0 calculations self.qcls_dd = qecl.library(opj(self.libdir_QE, 'qcls_dd'), self.qlms_dd, self.qlms_dd, self.mc_sims_bias) # FIXME currently only used for testing filter integration. These QE filter are not used for QE reoconstruction, but will be in the near future when Plancklens dependency is dropped. if self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee', 'ptt']: self.filter = self.get_filter() def init_cinv(self): self.cinv_t = filt_cinv.cinv_t(opj(self.libdir_QE, 'cinv_t'), self.lm_max_ivf[0], self.nivjob_geominfo[1]['nside'], self.cls_len, self.ttebl['t'], self.nivt_desc, marge_monopole=True, marge_dipole=True, marge_maps=[]) # FIXME is this right? what if analysis includes pixelwindow function? transf_elm_loc = gauss_beam(self.beam / 180 / 60 * np.pi, lmax=self.lm_max_ivf[0]) if self.OBD: self.cinv_p = cinv_p_OBD.cinv_p(opj(self.libdir_QE, 'cinv_p'), self.lm_max_ivf[0], self.nivjob_geominfo[1]['nside'], self.cls_len, transf_elm_loc[:self.lm_max_ivf[0]+1], self.nivp_desc, geom=self.nivjob_geomlib, chain_descr=self.chain_descr(self.lm_max_ivf[0], self.cg_tol), bmarg_lmax=self.lmin_teb[2], zbounds=self.zbounds, _bmarg_lib_dir=self.obd_libdir, _bmarg_rescal=self.obd_rescale, sht_threads=self.tr) else: self.cinv_p = filt_cinv.cinv_p(opj(self.TEMP, 'cinv_p'), self.lm_max_ivf[0], self.nivjob_geominfo[1]['nside'], self.cls_len, self.ttebl['e'], self.nivp_desc, chain_descr=self.chain_descr(self.lm_max_ivf[0], self.cg_tol), transf_blm=self.ttebl['b'], marge_qmaps=(), marge_umaps=()) # @base_exception_handler #@log_on_start(logging.INFO, "QE.collect_jobs(recalc={recalc}) started") #@log_on_end(logging.INFO, "QE.collect_jobs(recalc={recalc}) finished: jobs={self.jobs}") def collect_jobs(self, recalc=False): # qe_tasks overwrites task-list and is needed if MAP lensrec calls QE lensrec jobs = list(range(len(self.qe_tasks))) for taski, task in enumerate(self.qe_tasks): ## task_dependence ## calc_mf -> calc_phi, calc_blt -> calc_phi, (calc_mf) _jobs = [] if task == 'calc_meanfield': fn_mf = opj(self.libdir_QE, 'qlms_dd/simMF_k1%s_%s.fits' % (self.k, utils.mchash(self.simidxs_mf))) if not os.path.isfile(fn_mf) or recalc: for simidx in self.simidxs_mf: fn_qlm = opj(opj(self.libdir_QE, 'qlms_dd'), 'sim_%s_%04d.fits'%(self.k, simidx) if simidx != -1 else 'dat_%s.fits'%self.k) if not os.path.isfile(fn_qlm) or recalc: _jobs.append(int(simidx)) ## Calculate realization dependent phi, i.e. plm_it000. if task == 'calc_phi': ## this filename must match plancklens filename fn_mf = opj(self.libdir_QE, 'qlms_dd/simMF_k1%s_%s.fits' % (self.k, utils.mchash(self.simidxs_mf))) ## Skip if meanfield already calculated if not os.path.isfile(fn_mf) or recalc: for simidx in self.simidxs: fn_qlm = opj(opj(self.libdir_QE, 'qlms_dd'), 'sim_%s_%04d.fits'%(self.k, simidx) if simidx != -1 else 'dat_%s.fits'%self.k) if not os.path.isfile(fn_qlm) or recalc: _jobs.append(simidx) ## Calculate B-lensing template if task == 'calc_blt': for simidx in self.simidxs: ## this filename must match the one created in get_template_blm() fn_blt = opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, 0, 0, self.lm_max_blt[0]) + 'perturbative' * self.blt_pert + '.npy') if not os.path.isfile(fn_blt) or recalc: _jobs.append(simidx) jobs[taski] = _jobs self.jobs = jobs return jobs def init_aniso_filter(self): self.init_cinv() _filter_raw = filt_cinv.library_cinv_sepTP(opj(self.libdir_QE, 'ivfs'), self.simulationdata, self.cinv_t, self.cinv_p, self.cls_len) _ftl_rs = np.ones(self.lm_max_qlm[0] + 1, dtype=float) * (np.arange(self.lm_max_qlm[0] + 1) >= self.lmin_teb[0]) _fel_rs = np.ones(self.lm_max_qlm[0] + 1, dtype=float) * (np.arange(self.lm_max_qlm[0] + 1) >= self.lmin_teb[1]) _fbl_rs = np.ones(self.lm_max_qlm[0] + 1, dtype=float) * (np.arange(self.lm_max_qlm[0] + 1) >= self.lmin_teb[2]) self.ivfs = filt_util.library_ftl(_filter_raw, self.lm_max_qlm[0], _ftl_rs, _fel_rs, _fbl_rs) self.qlms_dd = qest.library_sepTP(opj(self.libdir_QE, 'qlms_dd'), self.ivfs, self.ivfs, self.cls_len['te'], self.nivjob_geominfo[1]['nside'], lmax_qlm=self.lm_max_qlm[0]) # @base_exception_handler #@log_on_start(logging.INFO, "QE.run(task={task}) started") #@log_on_end(logging.INFO, "QE.run(task={task}) finished") def run(self, task=None): ## task may be set from MAP lensrec, as MAP lensrec has prereqs to QE lensrec ## if None, then this is a normal QE lensrec call # Only now instantiate aniso filter if self.qe_filter_directional == 'anisotropic': self.init_aniso_filter() _tasks = self.qe_tasks if task is None else [task] for taski, task in enumerate(_tasks): log.info('{}, task {} started'.format(mpi.rank, task)) if task == 'calc_meanfield': for idx in self.jobs[taski][mpi.rank::mpi.size]: # In principle it is enough to calculate qlms. self.get_sim_qlm(int(idx)) log.info('{}/{}, finished job {}'.format(mpi.rank,mpi.size,idx)) if len(self.jobs[taski])>0: log.info('{} finished qe ivfs tasks. Waiting for all ranks to start mf calculation'.format(mpi.rank)) mpi.barrier() # Tunneling the meanfield-calculation, so only rank 0 calculates it. Otherwise, # some processes will try accessing it too fast, or calculate themselves, which results in # an io error log.info("Done waiting. Rank 0 going to calculate meanfield-file.. everyone else waiting.") if mpi.rank == 0: self.get_meanfield(int(idx)) log.info("rank finished calculating meanfield-file.. everyone else waiting.") mpi.barrier() if task == 'calc_phi': for idx in self.jobs[taski][mpi.rank::mpi.size]: self.get_plm(idx, self.QE_subtract_meanfield) if self.simulationdata.obs_lib.maps == DEFAULT_NotAValue: self.simulationdata.purgecache() if task == 'calc_blt': for simidx in self.jobs[taski][mpi.rank::mpi.size]: # ## Faking here MAP filters self.itlib_iterator = transform(self.MAP_job, iterator_transformer(self.MAP_job, simidx, self.dlensalot_model)) self.get_blt(simidx) if self.simulationdata.obs_lib.maps == DEFAULT_NotAValue: self.simulationdata.purgecache() # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_sim_qlm(simidx={simidx}) started") #@log_on_end(logging.INFO, "QE.get_sim_qlm(simidx={simidx}) finished") def get_sim_qlm(self, simidx): return self.qlms_dd.get_sim_qlm(self.k, int(simidx)) # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_wflm(simidx={simidx}) started") #@log_on_end(logging.INFO, "QE.get_wflm(simidx={simidx}) finished") def get_wflm(self, simidx): if self.k in ['ptt']: return lambda: alm_copy(self.ivfs.get_sim_tmliklm(simidx), None, self.lm_max_unl[0], self.lm_max_unl[1]) elif self.k in ['p_p', 'p_eb', 'peb', 'p_be', 'pee']: return lambda: alm_copy(self.ivfs.get_sim_emliklm(simidx), None, self.lm_max_unl[0], self.lm_max_unl[1]) elif self.k in ['p']: return lambda: np.array([alm_copy(self.ivfs.get_sim_tmliklm(simidx), None, self.lm_max_unl[0], self.lm_max_unl[1]), alm_copy(self.ivfs.get_sim_emliklm(simidx), None, self.lm_max_unl[0], self.lm_max_unl[1])]) # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_R_unl() started") #@log_on_end(logging.INFO, "QE.get_R_unl() finished") def get_R_unl(self): return qresp.get_response(self.k, self.lm_max_ivf[0], self.k[0], self.cls_unl, self.cls_unl, self.fteb_unl, lmax_qlm=self.lm_max_qlm[0])[0] # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_meanfield(simidx={simidx}) started") #@log_on_end(logging.INFO, "QE.get_meanfield(simidx={simidx}) finished") def get_meanfield(self, simidx): ret = np.zeros_like(self.qlms_dd.get_sim_qlm(self.k, 0)) if self.Nmf > 0: if self.mfvar == None: # FIXME plancklens needs to be less restrictive with type for simidx. ret = self.qlms_dd.get_sim_qlm_mf(self.k, [int(simidx_mf) for simidx_mf in self.simidxs_mf]) if simidx in self.simidxs_mf: ret = (ret - self.qlms_dd.get_sim_qlm(self.k, int(simidx)) / self.Nmf) * (self.Nmf / (self.Nmf - 1)) else: ret = hp.read_alm(self.mfvar) if simidx in self.simidxs_mf: ret = (ret - self.qlms_dd_mfvar.get_sim_qlm(self.k, int(simidx)) / self.Nmf) * (self.Nmf / (self.Nmf - 1)) return ret return ret # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_plm(simidx={simidx}, sub_mf={sub_mf}) started") #@log_on_end(logging.INFO, "QE.get_plm(simidx={simidx}, sub_mf={sub_mf}) finished") def get_plm_n1(self, simidx, sub_mf=True, N1=np.array([])): libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) if N1.size == 0: N1 = 0 fn_plm = opj(libdir_MAPidx, 'phi_plm_it000.npy') # Note: careful, this one doesn't have a simidx, so make sure it ends up in a simidx_directory (like MAP) else: fn_plm = opj(libdir_MAPidx, 'phi_plm_it000{}.npy'.format('_wN1')) if not os.path.exists(fn_plm): plm = self.qlms_dd.get_sim_qlm(self.k, int(simidx)) #Unormalized quadratic estimate: if sub_mf and self.version != 'noMF': plm -= self.mf(int(simidx)) # MF-subtracted unnormalized QE R = qresp.get_response(self.k, self.lm_max_ivf[0], self.k[0], self.cls_len, self.cls_len, self.ftebl_len, lmax_qlm=self.lm_max_qlm[0])[0] # Isotropic Wiener-filter (here assuming for simplicity N0 ~ 1/R) WF = self.cpp * utils.cli(self.cpp + utils.cli(R) + N1) plm = alm_copy(plm, None, self.lm_max_qlm[0], self.lm_max_qlm[1]) almxfl(plm, utils.cli(R), self.lm_max_qlm[1], True) # Normalized QE almxfl(plm, WF, self.lm_max_qlm[1], True) # Wiener-filter QE almxfl(plm, self.cpp > 0, self.lm_max_qlm[1], True) np.save(fn_plm, plm) return np.load(fn_plm) def get_plm(self, simidx, sub_mf=True): libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) fn_plm = opj(libdir_MAPidx, 'phi_plm_it000.npy') # Note: careful, this one doesn't have a simidx, so make sure it ends up in a simidx_directory (like MAP) if not os.path.exists(fn_plm): plm = self.qlms_dd.get_sim_qlm(self.k, int(simidx)) #Unormalized quadratic estimate: if sub_mf and self.version != 'noMF': plm -= self.mf(int(simidx)) # MF-subtracted unnormalized QE R = qresp.get_response(self.k, self.lm_max_ivf[0], self.k[0], self.cls_len, self.cls_len, self.ftebl_len, lmax_qlm=self.lm_max_qlm[0])[0] # Isotropic Wiener-filter (here assuming for simplicity N0 ~ 1/R) WF = self.cpp * utils.cli(self.cpp + utils.cli(R)) plm = alm_copy(plm, None, self.lm_max_qlm[0], self.lm_max_qlm[1]) almxfl(plm, utils.cli(R), self.lm_max_qlm[1], True) # Normalized QE almxfl(plm, WF, self.lm_max_qlm[1], True) # Wiener-filter QE almxfl(plm, self.cpp > 0, self.lm_max_qlm[1], True) np.save(fn_plm, plm) return np.load(fn_plm) #@log_on_start(logging.INFO, "QE.get_response_meanfield() started") #@log_on_end(logging.INFO, "QE.get_response_meanfield() finished") def get_response_meanfield(self): if self.k in ['p_p'] and not 'noRespMF' in self.version: mf_resp = qresp.get_mf_resp(self.k, self.cls_unl, {'ee': self.ftebl_len['e'], 'bb': self.ftebl_len['b']}, self.lm_max_ivf[0], self.lm_max_qlm[0])[0] else: log.info('*** mf_resp not implemented for key ' + self.k, ', setting it to zero') mf_resp = np.zeros(self.lm_max_qlm[0] + 1, dtype=float) return mf_resp # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_meanfield_normalized(simidx={simidx}) started") #@log_on_end(logging.INFO, "QE.get_meanfield_normalized(simidx={simidx}) finished") def get_meanfield_normalized(self, simidx): mf_QE = copy.deepcopy(self.get_meanfield(simidx)) R = qresp.get_response(self.k, self.lm_max_ivf[0], 'p', self.cls_len, self.cls_len, self.ftebl_len, lmax_qlm=self.lm_max_qlm[0])[0] WF = self.cpp * utils.cli(self.cpp + utils.cli(R)) almxfl(mf_QE, utils.cli(R), self.lm_max_qlm[1], True) # Normalized QE almxfl(mf_QE, WF, self.lm_max_qlm[1], True) # Wiener-filter QE almxfl(mf_QE, self.cpp > 0, self.lm_max_qlm[1], True) return mf_QE # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_blt({simidx}) started") #@log_on_end(logging.INFO, "QE.get_blt({simidx}) finished") def get_blt(self, simidx): fn_blt = opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, 0, 0, self.lm_max_blt[0]) + 'perturbative' * self.blt_pert + '.npy') if not os.path.exists(fn_blt): ## For QE, dlm_mod by construction doesn't do anything, because mean-field had already been subtracted from plm and we don't want to repeat that. dlm_mod = np.zeros_like(self.qlms_dd.get_sim_qlm(self.k, int(simidx))) blt = self.itlib_iterator.get_template_blm(0, 0, lmaxb=self.lm_max_blt[0], lmin_plm=self.Lmin, dlm_mod=dlm_mod, perturbative=self.blt_pert, k=self.k) np.save(fn_blt, blt) return np.load(fn_blt) # @base_exception_handler #@log_on_start(logging.INFO, "QE.get_blt({simidx}) started") #@log_on_end(logging.INFO, "QE.get_blt({simidx}) finished") def get_blt_new(self, simidx): def get_template_blm(it, it_e, lmaxb=1024, lmin_plm=1, perturbative=False): fn_blt = 'blt_p%03d_e%03d_lmax%s'%(it, it_e, lmaxb) fn_blt += 'perturbative' * perturbative elm_wf = self.filter.transf assert Alm.getlmax(elm_wf.size, self.mmax_filt) == self.lmax_filt mmaxb = lmaxb dlm = self.get_hlm(it, 'p') self.hlm2dlm(dlm, inplace=True) almxfl(dlm, np.arange(self.lmax_qlm + 1, dtype=int) >= lmin_plm, self.mmax_qlm, True) if perturbative: # Applies perturbative remapping get_alm = lambda a: elm_wf if a == 'e' else np.zeros_like(elm_wf) geom, sht_tr = self.filter.ffi.geom, self.filter.ffi.sht_tr d1 = geom.alm2map_spin([dlm, np.zeros_like(dlm)], 1, self.lmax_qlm, self.mmax_qlm, sht_tr, [-1., 1.]) dp = utils_qe.qeleg_multi([2], +3, [utils_qe.get_spin_raise(2, self.lmax_filt)])(get_alm, geom, sht_tr) dm = utils_qe.qeleg_multi([2], +1, [utils_qe.get_spin_lower(2, self.lmax_filt)])(get_alm, geom, sht_tr) dlens = -0.5 * ((d1[0] - 1j * d1[1]) * dp + (d1[0] + 1j * d1[1]) * dm) del dp, dm, d1 elm, blm = geom.map2alm_spin([dlens.real, dlens.imag], 2, lmaxb, mmaxb, sht_tr, [-1., 1.]) else: # Applies full remapping (this will re-calculate the angles) ffi = self.filter.ffi.change_dlm([dlm, None], self.mmax_qlm) elm, blm = ffi.lensgclm(np.array([elm_wf, np.zeros_like(elm_wf)]), self.mmax_filt, 2, lmaxb, mmaxb) self.blt_cacher.cache(fn_blt, blm) return blm fn_blt = opj(self.libdir_QE, 'BLT/blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, 0, 0, self.lm_max_blt[0]) + 'perturbative' * self.blt_pert + '.npy') if not os.path.exists(fn_blt): blt = get_template_blm(0, 0, lmaxb=self.lm_max_blt[0], lmin_plm=self.Lmin, perturbative=self.blt_pert) np.save(fn_blt, blt) return np.load(fn_blt) #@log_on_start(logging.INFO, "get_filter() started") #@log_on_end(logging.INFO, "get_filter() finished") def get_filter(self): QE_filters = transform(self, QE_transformer()) filter = transform(self, QE_filters()) return filter
[docs]class MAP_lr(Basejob): """Iterative lensing reconstruction Job. Depends on class QE_lr, and class Sim_generator. Performs tasks such as lensing reconstruction, mean-field calculation, and B-lensing template calculation. """ @check_MPI def __init__(self, dlensalot_model): super().__init__(dlensalot_model) # TODO Only needed to hand over to ith() self.dlensalot_model = dlensalot_model # FIXME remnant of previous solution how jobs were dependent on each other. This can perhaps be simplified now. self.simgen = Sim_generator(dlensalot_model) self.simulationdata = self.simgen.simulationdata self.qe = QE_lr(dlensalot_model, caller=self) self.qe.simulationdata = self.simgen.simulationdata # just to be sure, so we have a single truth in MAP_lr. ## tasks -> mf_dirname if "calc_meanfield" in self.it_tasks or 'calc_blt' in self.it_tasks: if not os.path.isdir(self.mf_dirname) and mpi.rank == 0: os.makedirs(self.mf_dirname) # sims -> sims_MAP if self.it_filter_directional == 'anisotropic': self.sims_MAP = utils_sims.ztrunc_sims(self.simulationdata, self.nivjob_geominfo[1]['nside'], [self.zbounds]) if self.k in ['ptt']: self.niv = self.sims_MAP.ztruncify(read_map(self.nivt_desc)) # inverse pixel noise map on consistent geometry else: self.niv = [self.sims_MAP.ztruncify(read_map(ni)) for ni in self.nivp_desc] # inverse pixel noise map on consistent geometry elif self.it_filter_directional == 'isotropic': self.sims_MAP = self.simulationdata self.filter = self.get_filter() # # @base_exception_handler #@log_on_start(logging.INFO, "MAP.map.collect_jobs() started") #@log_on_end(logging.INFO, "MAP.collect_jobs() finished: jobs={self.jobs}") def collect_jobs(self): jobs = list(range(len(self.it_tasks))) # TODO order of task list matters, but shouldn't for taski, task in enumerate(self.it_tasks): _jobs = [] if task == 'calc_phi': ## Here I only want to calculate files not calculated before, and only for the it job tasks. ## i.e. if no blt task in iterator job, then no blt task in QE job for simidx in self.simidxs: libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) if rec.maxiterdone(libdir_MAPidx) < self.itmax: _jobs.append(simidx) ## Calculate realization independent meanfields up to iteration itmax ## prereq: plms exist for itmax. maxiterdone won't work if calc_phi in task list elif task == 'calc_meanfield': for simidx in self.simidxs_mf: libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) if "calc_phi" in self.it_tasks: _jobs.append(0) elif rec.maxiterdone(libdir_MAPidx) < self.itmax: _jobs.append(0) elif task == 'calc_blt': for simidx in self.simidxs: fns_blt = np.array([opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, it, it, self.lm_max_blt[0]) + '.npy') for it in np.arange(1,self.itmax+1)]) if not np.all([os.path.exists(fn_blt) for fn_blt in fns_blt]): _jobs.append(simidx) jobs[taski] = _jobs self.jobs = jobs return jobs # @base_exception_handler #@log_on_start(logging.INFO, "MAP.run() started") #@log_on_end(logging.INFO, "MAP.run() finished") def run(self): for taski, task in enumerate(self.it_tasks): log.info('{}, task {} started, jobs: {}'.format(mpi.rank, task, self.jobs[taski])) if task == 'calc_phi': for simidx in self.jobs[taski][mpi.rank::mpi.size]: libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) if self.itmax >= 0 and rec.maxiterdone(libdir_MAPidx) < self.itmax: itlib_iterator = transform(self, iterator_transformer(self, simidx, self.dlensalot_model)) for it in range(self.itmax + 1): itlib_iterator.chain_descr = self.it_chain_descr(self.lm_max_unl[0], self.it_cg_tol(it)) itlib_iterator.soltn_cond = self.soltn_cond(it) itlib_iterator.iterate(it, 'p') log.info('{}, simidx {} done with it {}'.format(mpi.rank, simidx, it)) if self.simulationdata.obs_lib.maps == DEFAULT_NotAValue: self.simulationdata.purgecache() if task == 'calc_meanfield': # TODO I don't like barriers and not sure if they are still needed mpi.barrier() self.get_meanfields_it(np.arange(self.itmax+1), calc=True) mpi.barrier() if task == 'calc_blt': for simidx in self.jobs[taski][mpi.rank::mpi.size]: self.libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) self.itlib_iterator = transform(self, iterator_transformer(self, simidx, self.dlensalot_model)) for it in range(self.itmax + 1): self.get_blt_it(simidx, it) if self.simulationdata.obs_lib.maps == DEFAULT_NotAValue: self.simulationdata.purgecache() # # @base_exception_handler #@log_on_start(logging.INFO, "MAP.get_plm_it(simidx={simidx}, its={its}) started") #@log_on_end(logging.INFO, "MAP.get_plm_it(simidx={simidx}, its={its}) finished") def get_plm_it(self, simidx, its): plms = rec.load_plms(self.libdir_MAP(self.k, simidx, self.version), its) return plms # # @base_exception_handler #@log_on_start(logging.INFO, "MAP.get_meanfield_it(it={it}, calc={calc}) started") #@log_on_end(logging.INFO, "MAP.get_meanfield_it(it={it}, calc={calc}) finished") def get_meanfield_it(self, it, calc=False): fn = opj(self.mf_dirname, 'mf%03d_it%03d.npy'%(self.Nmf, it)) if not calc: if os.path.isfile(fn): mf = np.load(fn) else: mf = self.get_meanfield_it(self, it, calc=True) else: plm = rec.load_plms(self.libdir_MAP(self.k, self.simidxs[0], self.version), [0])[-1] mf = np.zeros_like(plm) for simidx in self.simidxs_mf: log.info("it {:02d}: adding sim {:03d}/{}".format(it, simidx, self.Nmf-1)) mf += rec.load_plms(self.libdir_MAP(self.k, simidx, self.version), [it])[-1] np.save(fn, mf/self.Nmf) return mf # @base_exception_handler #@log_on_start(logging.INFO, "MAP.get_meanfields_it(its={its}, calc={calc}) started") #@log_on_end(logging.INFO, "MAP.get_meanfields_it(its={its}, calc={calc}) finished") def get_meanfields_it(self, its, calc=False): plm = rec.load_plms(self.libdir_MAP(self.k, self.simidxs[0], self.version), [0])[-1] mfs = np.zeros(shape=(len(its),*plm.shape), dtype=np.complex128) if calc==True: for iti, it in enumerate(its[mpi.rank::mpi.size]): mfs[iti] = self.get_meanfield_it(it, calc=calc) mpi.barrier() for iti, it in enumerate(its[mpi.rank::mpi.size]): mfs[iti] = self.get_meanfield_it(it, calc=False) return mfs # @base_exception_handler #@log_on_start(logging.INFO, "MAP.get_blt_it(simidx={simidx}, it={it}) started") #@log_on_end(logging.INFO, "MAP.get_blt_it(simidx={simidx}, it={it}) finished") def get_blt_it(self, simidx, it): if it == 0: self.qe.itlib_iterator = transform(self, iterator_transformer(self, simidx, self.dlensalot_model)) return self.qe.get_blt(simidx) fn_blt = opj(self.libdir_blt, 'blt_%s_%04d_p%03d_e%03d_lmax%s'%(self.k, simidx, it, it, self.lm_max_blt[0]) + '.npy') if not os.path.exists(fn_blt): self.libdir_MAPidx = self.libdir_MAP(self.k, simidx, self.version) dlm_mod = np.zeros_like(rec.load_plms(self.libdir_MAPidx, [0])[0]) if self.dlm_mod_bool and it>0 and it<=rec.maxiterdone(self.libdir_MAPidx): dlm_mod = self.get_meanfields_it([it], calc=False) if simidx in self.simidxs_mf: dlm_mod = (dlm_mod - np.array(rec.load_plms(self.libdir_MAPidx, [it]))/self.Nmf) * self.Nmf/(self.Nmf - 1) if it<=rec.maxiterdone(self.libdir_MAPidx): blt = self.itlib_iterator.get_template_blm(it, it-1, lmaxb=self.lm_max_blt[0], lmin_plm=np.max([self.Lmin,5]), dlm_mod=dlm_mod, perturbative=False, k=self.k) np.save(fn_blt, blt) return np.load(fn_blt) #@log_on_start(logging.INFO, "get_filter() started") #@log_on_end(logging.INFO, "get_filter() finished") def get_filter(self): MAP_filters = transform(self, MAP_transformer()) filter = transform(self, MAP_filters()) return filter
[docs]class Map_delenser(Basejob): """Map delenser Job for calculating delensed ILC and Blens spectra using precaulculated Btemplates as input. This is a combination of, * delensing with Btemplates (QE, MAP), * choosing power spectrum calculation as in binning, masking, and templating """ @check_MPI def __init__(self, dlensalot_model): super().__init__(dlensalot_model) self.lib = dict() if 'nlevel' in self.binmasks: self.lib.update({'nlevel': {}}) if 'mask' in self.binmasks: self.lib.update({'mask': {}}) self.simgen = Sim_generator(dlensalot_model) self.libdir_delenser = opj(self.TEMP, 'delensing/{}'.format(self.dirid)) if not(os.path.isdir(self.libdir_delenser)): os.makedirs(self.libdir_delenser) self.fns = opj(self.libdir_delenser, 'ClBB_sim%04d.npy') # @base_exception_handler #@log_on_start(logging.INFO, "collect_jobs() started") #@log_on_end(logging.INFO, "collect_jobs() finished: jobs={self.jobs}") def collect_jobs(self): # TODO a valid job is any requested job?, as BLTs may also be on CFS jobs = [] for idx in self.simidxs: jobs.append(idx) self.jobs = jobs return jobs # @base_exception_handler #@log_on_start(logging.INFO, "run() started") #@log_on_end(logging.INFO, "run() finished") def run(self): outputdata = self._prepare_job() if self.jobs != []: for simidx in self.jobs[mpi.rank::mpi.size]: log.debug('will store file at: {}'.format(self.fns.format(simidx))) self.delens(simidx, outputdata) def _prepare_job(self): if self.binning == 'binned': outputdata = np.zeros(shape=(2, 2+len(self.its), len(self.nlevels)+len(self.masks_fromfn), len(self.edges)-1)) for maskflavour, masks in self.binmasks.items(): for maskid, mask in masks.items(): ## for a future me: ell-max of clc_templ must be edges[-1], lmax_mask can be anything... self.lib[maskflavour].update({maskid: self.cl_calc.map2cl_binned(mask, self.clc_templ, self.edges, self.lmax_mask)}) elif self.binning == 'unbinned': for maskflavour, masks in self.binmasks.items(): for maskid, mask in masks.items(): a = overwrite_anafast() if self.cl_calc == hp else masked_lib(mask, self.cl_calc, self.lmax, self.lmax_mask) outputdata = np.zeros(shape=(2, 2+len(self.its), len(self.nlevels)+len(self.masks_fromfn), self.lmax+1)) self.lib[maskflavour].update({maskid: a}) return outputdata # #@log_on_start(logging.INFO, "get_basemap() started") # #@log_on_end(logging.INFO, "get_basemap() finished") def get_basemap(self, simidx): if self.basemap == 'lens': return almxfl(alm_copy(self.simulationdata.get_sim_sky(simidx, space='alm', spin=0, field='polarization')[1], self.simulationdata.lmax, *self.lm_max_blt), self.ttebl['e'], self.lm_max_blt[0], inplace=False) else: # only checking for map to save some memory.. if np.all(self.simulationdata.maps == DEFAULT_NotAValue): return alm_copy(self.simulationdata.get_sim_obs(simidx, space='alm', spin=0, field='polarization')[1], self.simulationdata.lmax, *self.lm_max_blt) else: return hp.map2alm_spin(self.simulationdata.get_sim_obs(simidx, space='map', spin=2, field='polarization'), spin=2, lmax=self.lm_max_blt[0], mmax=self.lm_max_blt[1])[1] #@log_on_start(logging.INFO, "_delens() started") #@log_on_end(logging.INFO, "_delens() finished") def delens(self, simidx, outputdata): blm_L = self.get_basemap(simidx) blt_QE = self.get_blt_it(simidx, 0) bdel_QE = self.nivjob_geomlib.alm2map(blm_L-blt_QE, *self.lm_max_blt, nthreads=4) maskcounter = 0 for maskflavour, masks in self.binmasks.items(): for maskid, mask in masks.items(): log.debug("starting mask {} {}".format(maskflavour, maskid)) bcl_L = self.lib[maskflavour][maskid].map2cl(self.nivjob_geomlib.alm2map(blm_L, *self.lm_max_blt, nthreads=4)) outputdata[0][0][maskcounter] = bcl_L blt_L_QE = self.lib[maskflavour][maskid].map2cl(bdel_QE) outputdata[0][1][maskcounter] = blt_L_QE for iti, it in enumerate(self.its): blt_MAP = self.get_blt_it(simidx, it) bdel_MAP = self.nivjob_geomlib.alm2map(blm_L-blt_MAP, *self.lm_max_blt, nthreads=4) log.info("starting MAP delensing for iteration {}".format(it)) blt_L_MAP = self.lib[maskflavour][maskid].map2cl(bdel_MAP) outputdata[0][2+iti][maskcounter] = blt_L_MAP maskcounter+=1 np.save(self.fns.format(simidx), outputdata) #@log_on_start(logging.INFO, "get_residualblens() started") #@log_on_end(logging.INFO, "get_residualblens() finished") def get_residualblens(self, simidx, it): basemap = self.get_basemap(simidx) return basemap - self.get_blt_it(simidx, it) # @base_exception_handler #@log_on_start(logging.INFO, "read_data() started") #@log_on_end(logging.INFO, "read_data() finished") def read_data(self): bcl_L = np.zeros(shape=(len(self.its)+2, len(self.nlevels)+len(self.masks_fromfn), len(self.simidxs), len(self.edges)-1)) for simidxi, simidx in enumerate(self.simidxs): data = np.load(self.fns.format(simidx)) bcl_L[0,:,simidxi] = data[0][0] bcl_L[1,:,simidxi] = data[0][1] for iti, it in enumerate(self.its): bcl_L[2+iti,:,simidxi] = data[0][2+iti] return bcl_L def hlm2dlm(self, hlm, inplace): if self.h == 'd': return hlm if inplace else hlm.copy() if self.h == 'p': h2d = np.sqrt(np.arange(self.lmax_qlm + 1, dtype=float) * np.arange(1, self.lmax_qlm + 2, dtype=float)) elif self.h == 'k': h2d = cli(0.5 * np.sqrt(np.arange(self.lmax_qlm + 1, dtype=float) * np.arange(1, self.lmax_qlm + 2, dtype=float))) else: assert 0, self.h + ' not implemented' if inplace: almxfl(hlm, h2d, self.mmax_qlm, True) else: return almxfl(hlm, h2d, self.mmax_qlm, False)
class overwrite_anafast(): """Convenience class for overwriting method name """ def map2cl(self, *args, **kwargs): return hp.anafast(*args, **kwargs) class masked_lib: """Convenience class for handling method names """ def __init__(self, mask, cl_calc, lmax, lmax_mask): self.mask = mask self.cl_calc = cl_calc self.lmax = lmax self.lmax_mask = lmax_mask def map2cl(self, map): return self.cl_calc.map2cl(map, self.mask, self.lmax, self.lmax_mask)