Source code for xpipe.xhandle.xwrap

"""
Handles calling xshear with multiprocessing (OpenMP-style single node calculations)
"""
from __future__ import print_function, division
import os
import copy
import numpy as np
import subprocess as sp
import multiprocessing as mp
from collections import OrderedDict
import pandas as pd

from ..tools.selector import partition
from .. import paths

BADVAL = -9999.
np.seterr('warn')


###################################################################
# xshear config file generation

[docs]def get_head(params=None): """ Returns the cosmology config part of an *XSHEAR* config file Parameters ---------- params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used Returns ------- dict shear settings """ if params is None: params = paths.params head = OrderedDict([ ('H0', params["cosmo_params"]["H0"]), ('omega_m', params["cosmo_params"]["Om0"]), ('healpix_nside', 64), ('mask_style', 'none'), ]) return head
[docs]def get_shear(params=None): """ Returns the shear config part of an *XSHEAR* config file Parameters ---------- params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used Returns ------- dict shear settings """ if params is None: params = paths.params shear = OrderedDict([ ('shear_style', params['shear_style']), ('sigmacrit_style', 'sample'), ('shear_units', 'both'), ('sourceid_style', 'index'), ('weight_style', params['weight_style']), ]) return shear
[docs]def get_redges(params=None): """ Returns the radial binning config part of an *XSHEAR* config file Parameters ---------- params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used Returns ------- dict radial binning settings """ if params is None: params = paths.params redges = OrderedDict([ ('rmin', params['radial_bins']['rmin']), ('rmax', params['radial_bins']['rmax']), ('nbin', params['radial_bins']['nbin']), ('r_units', params['radial_bins']['units']) ]) return redges
[docs]def get_pairlog(params=None): """ Returns the source-lens pair logging config part of an *XSHEAR* config file Parameters ---------- params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used Returns ------- dict source-lens pair logging settings """ if params is None: params = paths.params pairlog = OrderedDict([ ('pairlog_rmin', params['pairlog']['pairlog_rmin']), ('pairlog_rmax', params['pairlog']['pairlog_rmax']), ('pairlog_nmax', params['pairlog']['pairlog_nmax']), ]) return pairlog
pairlog_nopairs = OrderedDict([ ('pairlog_rmin', 0), ('pairlog_rmax', 0), ('pairlog_nmax', 0), ]) tail = OrderedDict([ ('zdiff_min', 0.1), ]) tail_uniform = OrderedDict([ ('zdiff_min', 0.1), ('weight_style', "uniform"), ]) def get_default_xshear_settings(params=None): """The baseline settings for a Metacal lensing measurement""" default_xshear_settings = { 'head': get_head(params=params), 'shear': OrderedDict([ ('shear_style', 'metacal'), ('sigmacrit_style', 'sample'), ('shear_units', 'both'), ('sourceid_style', 'index'), ]), 'redges': get_redges(params=params), 'pairlog': pairlog_nopairs, 'tail': tail, } return default_xshear_settings
[docs]def addlines(cfg, odict): """appends lines to config file""" for key in odict.keys(): line = key + ' = ' + str(odict[key]) + '\n' cfg.write(line)
[docs]def write_custom_xconf(fname, xsettings=None, params=None): """ Writes custom *XSHEAR* config file Parameters ---------- fname : str path name for config file xsettings : dict custom settings params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used """ settings = get_default_xshear_settings(params=params) if xsettings is not None: settings.update(xsettings) with open(fname, 'w+') as cfg: addlines(cfg, settings['head']) addlines(cfg, settings['shear']) addlines(cfg, settings['redges']) addlines(cfg, settings['pairlog']) addlines(cfg, settings['tail'])
[docs]def write_xconf(fname, pairs=True): """ Writes simple *XSHEAR* config file based on :py:data:`paths.params` Parameters ---------- fname : str path name for config file pairs pairs : bool flag to log source-lens pairs """ with open(fname, 'w+') as cfg: addlines(cfg, get_head()) addlines(cfg, get_shear()) addlines(cfg, get_redges()) if pairs: addlines(cfg, get_pairlog()) else: addlines(cfg, pairlog_nopairs) addlines(cfg, tail)
###################################################################
[docs]def get_main_source_settings(params=None): """Load settings for unsheared METACAL run *with* pairlogging""" if params is None: params = paths.params main_source_settings = { "shear": OrderedDict([ ('shear_style', "metacal"), ('sigmacrit_style', 'sample'), ('shear_units', 'both'), ('sourceid_style', 'index'), ]), "pairlog": get_pairlog(params=params) } return main_source_settings
[docs]def get_main_source_settings_nopairs(): """Load settings for unsheared METACAL run *with out* pairlogging""" main_source_settings_nopairs = { "shear": OrderedDict([ ('shear_style', "metacal"), ('sigmacrit_style', 'sample'), ('shear_units', 'both'), ('sourceid_style', 'index'), ]), "pairlog": pairlog_nopairs } return main_source_settings_nopairs
sheared_tags = ["_1p", "_1m", "_2p", "_2m"] sheared_tags_long = ["_sheared_1p", "_sheared_1m", "_sheared_2p", "_sheared_2m"] sheared_source_settings = { "shear": OrderedDict([ ('shear_style', "reduced"), ('sigmacrit_style', 'sample'), ('shear_units', 'both'), ('sourceid_style', 'index'), ]), "pairlog": pairlog_nopairs }
[docs]def get_metanames(fnames): """ Calculates METACAL-style sheared file names Parameters ---------- fnames : list input file names Returns ------- list * METANAMES, the fnames appended by the :code:`sheared_tags` """ metanames = [] for tag in sheared_tags: _fnames = [] for fname in fnames: _fnames.append(fname.replace(".dat", tag + ".dat")) metanames.append(_fnames) return metanames
################################################################### # rotated shear catalog
[docs]def get_rot_seeds(nrot, seed_master): """Radnom generates seeds for random rotations using the master seed""" rng = np.random.RandomState(seed=seed_master) smax = np.iinfo(np.uint32).max seed_rots = rng.randint(low=0, high=smax, size=nrot) return seed_rots
[docs]def rot2d(e1, e2, alpha): """2D counterclockwise roation matrix""" e1n = e1 * np.cos(alpha) - e2 * np.sin(alpha) e2n = e1 * np.sin(alpha) + e2 * np.cos(alpha) return e1n, e2n
[docs]class CatRotator(object): def __init__(self, fname, seed=5, e_inds=(3, 4)): """ Loads shear catalog, and saves a randomly rotated version Parameters ---------- fname : str path to the shear catalog (ascii format) seed : int random seed for rotation e_inds : list inxed for :c;Pode:`e1` and :code:`e2` column of the shear catalog Notes ----- Steps: * read catalog from ascii file * rotate catalog based on random seed * write catalog to file. The output file name is defined by the prefix: :code:`rot_seed' + str(self.seed) + '_'` The rotated catalog file is later deleted after the calculations finished """ self.fname = fname self.seed = seed self.rng = np.random.RandomState(seed=self.seed) self.cat = None self.get_oname() if len(e_inds) == 2 and e_inds[1] - e_inds[0] == 1: self.ind_e1 = e_inds[0] self.ind_e2 = e_inds[1] else: raise ValueError('e_inds must contain two consecutive integer values indexing e1, and e2')
[docs] def get_oname(self): """get output name""" head, tail = os.path.split(self.fname) self.oname = head + '/rot_seed' + str(self.seed) + '_' + tail
def _rotcat(self): """apply random rotation to shears""" e1 = self.cat[:, self.ind_e1] e2 = self.cat[:, self.ind_e2] alpha = self.rng.uniform(low=0.0, high=2. * np.pi, size=len(e1)) e1n, e2n = rot2d(e1, e2, alpha) self.cat[:, self.ind_e1] = e1n self.cat[:, self.ind_e2] = e2n
[docs] def readcat(self): """read shear catalog""" self.cat = pd.read_csv(self.fname, sep=' ', header=None).values
[docs] def writecat(self): """write *rotated* shear catalog""" fmt = ['%d', ] + (self.cat.shape[1] - 1) * ['%.6f'] np.savetxt(self.oname, self.cat, fmt=fmt)
[docs] def rmcat(self): """delete shear catalog""" try: os.remove(self.oname) except OSError: pass
[docs] def rotate(self): """Perform a random rotation on the shear catalog""" print(' starting reading') self.readcat() print(' rotating with seed = ' + str(self.seed)) self._rotcat() print(' writing...') self.writecat() print(' finished')
[docs]def single_rotate(flist, seed_rot, metasel=False, head=False): """ runs one single rotation of the source catalog with METACAL SELECTION RESPONSES Parameters ---------- flist : list list of individual *xshear* input files seed_rot : int random seed to be used in the rotation metasel : bool whether to perform the rotation in calculating the metacal selection responses also. head : bool whether to use only the first few rows of the shear catalog """ #"""runs one single rotation of the source catalog with METACAL SELECTION RESPONSES""" print(" writing xshear input") xpath = paths.dirpaths["xin"] + "/" + paths.params["tag"] + "/" + paths.params["tag"] + \ "_xconfig_rot_seed" + str(seed_rot) + ".cfg" write_custom_xconf(xpath, xsettings=get_main_source_settings_nopairs()) print(' creating main catalog') sname = paths.fullpaths[paths.params['shear_to_use']] cr = CatRotator(sname, seed=seed_rot) cr.rotate() print(' running measurement') clust_infos = create_infodict(flist, pairs=False, head=head, rotate=True, seed=seed_rot, shape_path=cr.oname, xconfig=xpath) multi_xrun(clust_infos, nprocess=paths.params['nprocess']) print(' finished...') print(' removing catalog') cr.rmcat() # starting runs for metacal selection responses if metasel: print(' creating sheared catalogs') for tag in sheared_tags: xpath_sheared = paths.dirpaths["xin"] + "/" + paths.params["tag"] + \ "xconfig" + tag + "_rot_seed" + str(seed_rot) + ".cfg" write_custom_xconf(xpath_sheared, xsettings=sheared_source_settings) sname = paths.fullpaths[paths.params['shear_to_use']].replace(".dat", tag + ".dat") cr = CatRotator(sname, seed=seed_rot) cr.rotate() print(' running sheared measurement', tag) clust_infos = create_infodict(flist, pairs=False, head=head, rotate=True, seed=seed_rot, shape_path=cr.oname, metatag=tag, xconfig=xpath_sheared) multi_xrun(clust_infos, nprocess=paths.params['nprocess']) print(' finished...') print(' removing catalog') cr.rmcat()
[docs]def serial_rotate(flist, metasel=False, nrot=20, head=False, seed_master=5): """ performs the random rotations serially and saves them to file Parameters ---------- flist : list list of individual *xshear* input files metasel : bool whether to perform the rotation in calculating the metacal selection responses also. nrot : int number of rotations to perform head : bool whether to use only the first few rows of the shear catalog seed_master : int master seed to generate random seeds for the rotating the entire catalog """ seed_rots = get_rot_seeds(nrot=nrot, seed_master=seed_master) for i, seed_rot in enumerate(seed_rots): print('rotation ', i) single_rotate(flist, seed_rot, metasel=metasel, head=head)
[docs]def chunkwise_rotate(flist, metasel=False, nrot=20, nchunks=1, ichunk=0, head=False, seed_master=5): """ Performs the random rotations and saves them to file Parameters ---------- flist : list list of individual *xshear* input files metasel : bool whether to perform the rotation in calculating the metacal selection responses also. nrot : int number of rotations in total nchunks : int number of chunks to divide the task into ichunk : int index of the current chunk head : bool whether to use only the first few rows of the shear catalog seed_master : int master seed to generate random seeds for the rotating the entire catalog """ seed_rots = get_rot_seeds(nrot=nrot, seed_master=seed_master) seed_chunks = partition(seed_rots, nchunks) print('using seeds: ', seed_chunks[ichunk]) for i, seed_rot in enumerate(seed_chunks[ichunk]): print('rotation ', i) single_rotate(flist, seed_rot, metasel=metasel, head=head)
################################################################### def extract_pairs_bins(infiles, jk=True): """ Extracts pairs files and bin IDs from input file lists Parameters ---------- infiles : list absolute file paths for the input file lists jk : bool if True, assumes a nested list where the top level list contains paramter bins, and the sub-lists consist of the file paths for individual JackKnife patches Returns ------- list, list Pairs files (nesting depends on the :code:`jk` flag bin IDs """ if jk: bin_vals = [] pairs_files = [] for names in infiles: clust_info = create_infodict(names) pairs_file = [info["pairsfile"] for info in clust_info] pairs_files.append(pairs_file) bin_val = np.array(pairs_file[0].split("_qbin-")[1].split("_patch")[0].split("-"), dtype=int) bin_vals.append(bin_val) bin_vals = np.array(bin_vals) else: clust_info = create_infodict(infiles) pairs_files = [info["pairsfile"] for info in clust_info] bin_vals = np.array([name.split("_qbin-")[1].split(".dat")[0].split("-") for name in infiles], dtype=int) print(bin_vals) return pairs_files, np.array(bin_vals)
[docs]def create_infodict(flist, head=False, pairs=False, seed=None, rotate=False, seed_tag="", shape_path=None, metatag=None, xconfig=None, params=None, dirpaths=None, fullpaths=None, src_bins=(0,)): """ Creates configuration dictionary which can be passed to multiprocessing map_async() Parameters ---------- flist : list list of individual *xshear* input files head : bool whether to use only the first few rows of the shear catalog pairs : bool whether to log source-lens pairs seed : int random seed rotate : bool whether to perform random rotations seed_tag : str additional comment to write to each file. default is ``"_seed" + str(seed)`` shape_path : str absolute path to the source catalog. If None the default path will be loaded from :py:data:`path.params` metatag : str tag to indicate which metacalibration sheared version this run corresponds to xconfig : str absolute path to xshear executable params : dict Pipeline settings in a dictionary format. If :code:`None` then the default :py:data:`paths.params` will be used dirpaths : dict Pipeline directory paths in a dictionary format. If :code:`None` then the default :py:data:`paths.fullpaths` will be used fullpaths : dict Pipeline file paths in a dictionary format. If :code:`None` then the default :py:data:`paths.fullpaths` will be used Returns ------- list of dict """ if params is None and dirpaths is None and fullpaths is None: params = paths.params dirpaths = paths.dirpaths fullpaths = paths.fullpaths elif None in (params, dirpaths, fullpaths): raise SyntaxError("Some of the arguments are left at default," " which results in inconsistent behaviour. " "If using custom input, define all arguments!") iroot = dirpaths["xin"] + "/" + params["tag"] + "/" oroot = dirpaths["xout"] + "/" + params["tag"] + "/" if rotate: seed_tag += '_seed' + str(seed) if metatag is None: metatag = "" if shape_path is None: shape_path = fullpaths[params['shear_to_use']] if xconfig is None: xconfig = dirpaths["xin"] + "/" + params["tag"] + "/" + params["tag"] + "_xconfig.dat" infodicts = [] for file in flist: for isrc in src_bins: src_tag = "" _spath = shape_path if len(src_bins) > 1: src_tag = "_bin" + str(isrc + 1) _spath = shape_path + src_tag + metatag + ".dat" infodict_raw = { 'infile': iroot + file, 'outfile': oroot + file.split('.dat')[0] + src_tag + seed_tag + '_result' + metatag + '.dat', 'pairsfile': oroot + file.split('.dat')[0] + src_tag + seed_tag + '_result_pairs' + metatag + '.dat', 'logfile': oroot + file.split('.dat')[0] + src_tag + seed_tag + '_result_log' + metatag + '.dat', 'head': head, 'pairs': pairs, 'seed': seed, 'rotate_shears': rotate, 'shape_path': _spath, 'xconfig': xconfig, } infodicts.append(infodict_raw) return infodicts
[docs]def call_xshear(infodict): """ Calls xshear in a single process Run is based on the passed information dict Parameters ---------- infodict : dict A single list element returned from :py:func:`create_infodict` """ infile = infodict['infile'] outfile = infodict['outfile'] pairsfile = infodict['pairsfile'] logfile = infodict['logfile'] shape_path = infodict['shape_path'] if 'head' in infodict.keys() and infodict['head']: cmd1 = 'head -n ' + str(infodict['head']) + " " + shape_path else: cmd1 = 'cat ' + shape_path if 'xconfig' in infodict.keys() and infodict['xconfig'] is not None: xconfig = infodict['xconfig'] else: xconfig = paths.fullpaths['xconfig'] print(xconfig) cmd2 = paths.fullpaths['xpath'] + ' ' + xconfig + ' ' +\ infile + ' ' + pairsfile print('processing ' + os.path.split(infile)[1] + ' with ' + os.path.split(shape_path)[1]) try: rfile = open(outfile, 'w+') lfile = open(logfile, 'w+') p1 = sp.Popen(cmd1.split(' '), stdout=sp.PIPE) p2 = sp.Popen(cmd2.split(' '), stdin=p1.stdout, stdout=rfile, stderr=lfile) p1.stdout.close() p2.communicate() rfile.close() lfile.close() print('finished: ' + os.path.split(outfile)[1]) except KeyboardInterrupt: raise KeyboardInterrupt
[docs]def call_chunks(chunk): """Executes serial calculation for each chunk (simple for loop)""" try: for infodict in chunk: call_xshear(infodict) except KeyboardInterrupt: pass
[docs]def multi_xrun(infodicts, nprocess=1): """ OpenMP style parallelization for xshear tasks Separates tasks into chunks, and passes each chunk for an independent process for serial evaulation via :py:func:`call_chunks` Parameters ---------- infodict : dict A single list element returned from :py:func:`create_infodict` nprocess : int Number of processes (cores) to use. Maximum number is always set by ``len(infodicts)`` """ # at most as many processes can be used as there are independent tasks... if nprocess > len(infodicts): nprocess = len(infodicts) print('starting xshear calculations in ' + str(nprocess) + ' processes') fparchunks = partition(infodicts, nprocess) pool = mp.Pool(processes=nprocess) try: pp = pool.map_async(call_chunks, fparchunks) pp.get(172800) # apparently this counters a bug in the exception passing in python.subprocess... except KeyboardInterrupt: print("Caught KeyboardInterrupt, terminating workers") pool.terminate() pool.join() else: pool.close() pool.join()
############################################################### def write_profile(prof, path): """saves DeltaSigma and covariance in text format""" # Saving profile profheader = "R [Mpc]\tDeltaSigma_t [M_sun / pc^2]\tDeltaSigma_t_err [M_sun / pc^2]\tDeltaSigma_x [M_sun / pc^2]\tDeltaSigma_x_err [M_sun / pc^2]" res = np.vstack((prof.rr, prof.dst, prof.dst_err, prof.dsx, prof.dsx_err)).T fname = path + "_profile.dat" print("saving:", fname) np.savetxt(fname, res, header=profheader) # Saving covariance np.savetxt(path + "_dst_cov.dat", prof.dst_cov) np.savetxt(path + "_dsx_cov.dat", prof.dsx_cov) def get_calib_cont(): cont = OrderedDict([ ("scrit_inv", []), ("scrit_inv_err", []), ("zmean_clust", []), ("zmean_rands", []), ("delta_clust", []), ("delta_clust_err", []), ("delta_rands", []), ("delta_rands_err", []), ]) return cont def export_scrit_inv(prof): """Exports the Sigma_crit inverse from a profile container""" refprof = copy.deepcopy(prof) refprof.ismeta = False refprof.dst_nom = 6 refprof.dsx_nom = 7 refprof.dst_denom = 8 refprof.dsx_denom = 9 refprof.prof_maker() scrit_inv = refprof.dst[-1] scrit_inv_err = refprof.dst_err[-1] return scrit_inv, scrit_inv_err def append_scrit_inv(dcont, prof): scrit_inv, scrit_inv_err = export_scrit_inv(prof) dcont["scrit_inv"].append(scrit_inv) dcont["scrit_inv_err"].append(scrit_inv_err) return dcont def write_calib_cont(resname, ccont, bin_vals): calib_params = np.ones((len(bin_vals), 10)) * BADVAL header = ("lambda_bin\t z_bin\t" " sigma_crit_inv [pc^2 / M_\odot]\t sigma_crit_inv_err [pc^2 / M_\odot]" " zmean_clust\t zmean_rands\t" " delta_clust\t delta_clust_err\t delta_rands\t delta_rands_err\t") calib_params[:, :2] = bin_vals for i, key in enumerate(ccont.keys()): print(key) if ccont[key]: calib_params[:, 2 + i] = ccont[key] fmt = ["%d", ] * 2 + ["%.12f"] * 8 np.savetxt(resname, calib_params, fmt=fmt, header=header) """ dat = pickle.load(open("/home/moon/vargatn/DES/Y3_WORK/xpipe/bin/pipe-y1/ccont.p", "rb")) header = ("z_bin\t sigma_crit_inv [pc^2 / M_\odot]\t sigma_crit_inv_err [pc^2 / M_\odot]") fmt = ["%d", ] + ["%.12f"] * 2 BADVAL = -9999 calib_params = np.ones((3, 3)) * BADVAL calib_params[:, 0] = [0, 1, 2] calib_params[:, 1] = dat["scrit_inv"] calib_params[:, 2] = dat["scrit_inv_err"] np.savetxt("mustar_newcut_scritinv.dat", calib_params, fmt=fmt, header=header) """