diff --git a/src/pythium/common/tHbbCPTools.py b/src/pythium/common/tHbbCPTools.py new file mode 100644 index 0000000..59c1d20 --- /dev/null +++ b/src/pythium/common/tHbbCPTools.py @@ -0,0 +1,45 @@ +from pythium.common.user_tools import * + +def unpack_samples(sample_info,flist): + for group in sample_info.keys(): + for sample in sample_info[group].keys(): + info =sample_info[group][sample] + files = [] + for container in info['containers']: + with open(f'{flist}/{container}','r') as file: + lines = [line.strip() for line in file.readlines()] + lines = [line.replace('davs:','root:').replace('.uk:443/dpm','.uk//dpm') for line in lines] + files += lines + sample_info[group][sample]['files'] = files + return sample_info + +def leading_particle(particle): + return particle[ak.argmax(particle.pt,axis=-1,keepdims=True)][:,0] + +def dxbb(pH,pT,pQ,f): + return np.log(pH/(f*pT + (1-f)*pQ)) + +def overlap_removal(p1, p2, dR): + event = ak.zip({ + 'p1':p1, + 'p2':p2 + }) + return ak.any(DeltaR(event.p1,event.p2)>=dR,axis = -1) + +def overlap(p1, p2, dR): + event = ak.zip({ + 'p1':p1, + 'p2':p2 + }) + return (DeltaR(event.p1,event.p2)<=dR) + +def momentum_PtEtaPhiE(pt,eta,phi,e): + px = pt*np.cos(phi) + py = pt*np.sin(phi) + pz = pt*np.sinh(eta) + m = np.nan_to_num(np.sqrt(pow(e,2) - (pow(px,2)+pow(py,2)+pow(pz,2)))) + return momentum_4d(px,py,pz,m) + + + + diff --git a/src/pythium/sklimming/reader.py b/src/pythium/sklimming/reader.py index 2652c8e..d8d47d9 100644 --- a/src/pythium/sklimming/reader.py +++ b/src/pythium/sklimming/reader.py @@ -1,4 +1,4 @@ -''' +''' author: Mohamed Aly (maly@cern.ch) Brief: Functions to set up file paths and read sample data into awkward arrays using uproot methods ''' @@ -6,11 +6,11 @@ #=========== File Reading Imports import uproot4 as uproot from pathlib import Path -import awkward as ak +import awkward as ak import numpy as np #============== System and Python Imports from typing import List, Dict, Tuple -import glob, os +import os import re import time import psutil @@ -22,7 +22,13 @@ from pythium.sklimming import writer from pythium.common import tools - +REMOTE_GLOBE_IMPORTED = False +try: + import XRootD.client as client + import XRootD.client.glob_funcs as glob + REMOTE_GLOBE_IMPORTED = True +except ImportError: + import glob CfgType = Dict[str, Dict[str, Union[str,bool,int,float]]] SampleDataType = Dict[str, List[Union[str,float,int]]] @@ -30,6 +36,21 @@ CutFuncArgsType = Dict[str,List[Union[str,float,int]]] BranchStatusType = Dict[str,Dict[str,List[Union["Branch",str]]]] +def get_file_size(file: str)->float: + ''' + Find the size of a file in GB from its filename + Args: + file: A string containing a filename + Return: + A float of the size of the file in GB + ''' + if REMOTE_GLOBE_IMPORTED: + with client.File() as f: + f.open(file) + info=f.stat() + return info[1].size*1e-9 + else: + return os.path.getsize(file)*1e-9 def decorate_sample_tag(tags:List[str])->List[str]: ''' @@ -39,7 +60,7 @@ def decorate_sample_tag(tags:List[str])->List[str]: Return: a list with decorated elements ''' - return [f'*{tag}*' for tag in tags] + return [f'{tag}' for tag in tags] def make_sample_path(locations: List[Path], tags:List[str]) -> List[str]: @@ -51,7 +72,7 @@ def make_sample_path(locations: List[Path], tags:List[str]) -> List[str]: Return: paths: A list of full paths up to sample tags ''' - paths = [str(loc)+'/'+tag for tag in tags for loc in locations] + paths = [((str(loc)+'/') if str(loc) != '.' else '') +tag for tag in tags for loc in locations] dirpaths = [p+'/.*' for path in paths for p in glob.glob(path) if os.path.isdir(p)] fpaths = list(set([path for path in paths for p in glob.glob(path) if not os.path.isdir(p) ])) paths = dirpaths+fpaths @@ -83,66 +104,67 @@ def process_sample(sample: "Sample", cfg: CfgType)->Dict[str, "ak.Array"]: on_branches = [branch for branch in branch_list if branch.branch_type == 'on'] new_branches = [branch for branch in branch_list if branch.branch_type == 'new'] tmp_branches = [branch for branch in branch_list if branch.drop is True] - # required branch names to compute new branches -- not Branch objects. - req_branches = [arg for branch in new_branches - for idx, arg in enumerate(branch.alg_args) - if (branch.alg_arg_types[idx]== Branch + # required branch names to compute new branches -- not Branch objects. + req_branches = [arg for branch in new_branches + for idx, arg in enumerate(branch.alg_args) + if (branch.alg_arg_types[idx]== Branch and (branch.args_from is None or branch.args_from[idx] == tree) and arg not in branch_write_names)] - + branch_names = list(set(on_branch_names+req_branches)) trees_to_branch_names[tree] = branch_names trees_to_status_to_branches[tree]['on'] = on_branches trees_to_status_to_branches[tree]['req'] = req_branches trees_to_status_to_branches[tree]['new'] = new_branches trees_to_status_to_branches[tree]['tmp'] = tmp_branches - + if sample.selec is not None: # add branches needed just for cuts to req_branches for tree, args in sample.selec.args.items(): branch_write_names = tree_to_branch_write_names[tree] req_branches = [arg for arg in args if isinstance(arg, str) and arg not in branch_write_names] trees_to_status_to_branches[tree]['req'] += req_branches - + sample_yield = run_workflow(paths, trees_to_branch_names, sample, cfg, trees_to_status_to_branches) - + outdir = cfg["settings"]["outdir"] with open(f"{outdir}/Yields.csv", "a+") as yf: yf.write(f"{sample.name},{sample_yield}\n") - + def run_workflow(paths: List[str], trees_to_branch_names: Dict[str, str], sample: "Sample", cfg: CfgType, trees_to_status_to_branches: BranchStatusType)->Dict[str,"ak.Array"]: ''' - Method responsible for running the pre-processing workflow, starting from file reading, + Method responsible for running the pre-processing workflow, starting from file reading, followed by manipulating input then writing out data. The workflow cycle is done on chunks of files Args: paths: the list of paths to look for sample trees_to_branch_names: A map from tree names to branches to be extracted from tree - sample: The Sample object being procssed + sample: The Sample object being procssed cfg: A dictionary carrying all settings specified in config file - trees_to_status_to_branches: Map for each sample from tree to various status + trees_to_status_to_branches: Map for each sample from tree to various status of branches to branches that match that status. Status can be on, off, new, req, tmp. Return: - None + None ''' skip_missing_files = cfg['settings']['skipmissingfiles'] skip_missing_sample = cfg['settings']['skipmissingsamples'] tag = sample.tag out_idx = 0 - for path in paths: logger.info(f'Processing data from the following path: \n {path}') + files = glob.glob(path) # Get list of files matching path regex + if len(files) == 0: logger.warning(f"No files were found in {path}, skipping") - continue + continue try: # split files into list(list) where inner list of files total size < 4GB data_chunks = chunk_files(files) # int to keep track of how many chunks were processed smoothly done=0 # Loop over groups of 4GBs file sets - for chunk_index, chunk in enumerate(data_chunks): + for chunk_index, chunk in enumerate(data_chunks): logger.info(f"Reading data from with uproot for chunk {chunk_index+1}/{len(data_chunks)}") exception = 0 # While there are still exceptions in reading the file, remove file causing exception and re-read @@ -150,40 +172,42 @@ def run_workflow(paths: List[str], trees_to_branch_names: Dict[str, str], sample chunk_data, exception = read(chunk, trees_to_branch_names, skip_missing_sample) # If all files were read succesfully break before code tries to find errors if exception is None: break - broken_file = handle_exception(exception, skip_missing_files) + broken_file = handle_exception(exception, skip_missing_files) # if exception handled remove file with missing key chunk.remove(broken_file) # re-read rest of files. If exception is still not None, while loop starts over to remove next file chunk_data, exception = read(chunk,trees_to_branch_names, skip_missing_sample) logger.info(f"Manipulating chunk data") chunk_data = finalize(chunk_data, sample, trees_to_status_to_branches) - + if len(chunk_data)!=0: logger.info(f"Writing chunk data") outfile = writer.write_sample(chunk_data, sample, cfg , suffix='__chunk'+str(out_idx)) out_idx += 1 done += 1 - + # Clean up memory del chunk_data gc.collect() + + #logger.info(f"Storing data memory_info().rss (GB) = {psutil.Process(os.getpid()).memory_info().rss / 1024 ** 3}") else: logger.warning(f"No data from chunk, not written out") if done == 0 and not skip_missing_sample: logger.error(f"No data saved from any chunks. Are you cutting too harshly?") elif done == 0 and skip_missing_sample: logger.warning(f"No data saved from any chunks. Are you cutting too harshly?") - + # If some other error was found which is not caught by the process() function, display it and break except Exception as not_uproot_except: raise not_uproot_except - + return None def chunk_files(files: List[str]) -> List[List[str]]: ''' Method to split a list of file to a list of lists with inner lists - total size of files < 4GB. + total size of files < 4GB. Args: files: The list of file paths to chunk up @@ -191,7 +215,7 @@ def chunk_files(files: List[str]) -> List[List[str]]: list of lists of file paths ''' limit = 2. - files_iter, filesizes = iter(files), iter([os.path.getsize(file)*1e-9 for file in files]) + files_iter, filesizes = iter(files), iter([get_file_size(file) for file in files]) f, fs = next(files_iter), next(filesizes) chunks = [] while f is not None: @@ -200,7 +224,7 @@ def chunk_files(files: List[str]) -> List[List[str]]: chunksize += fs if (fs > limit and chunk != []) or (chunksize > limit and fs <= limit): chunksize -= fs - break + break chunk.append(f) fs = next(filesizes, None) f = next(files_iter, None) @@ -217,9 +241,9 @@ def read(chunk_files: List[str], trees_to_branch_names: Dict[str, str], skipsamp Args: file_list: A list of files whose size add up to <= 4GBs trees_to_branch_names: A map from tree names to branches to be extracted from tree - Return + Return If no exception arises: - ({tree: ak.Array}, None) + ({tree: ak.Array}, None) else: (None, exception) @@ -229,7 +253,7 @@ def read(chunk_files: List[str], trees_to_branch_names: Dict[str, str], skipsamp if len(chunk_files) == 0: if not skipsample: logger.error("Chunk of files is empty. If files have been skipped, this might mean all files could not be read") else: return {}, None - + chunk_data = {} for tree, branches in trees_to_branch_names.items(): start_t = time.time() @@ -288,7 +312,7 @@ def handle_exception(exception: Exception, skip_missing_files: bool)->Optional[s # If some other excpetion where there is no specific file, break code else: raise exception - logger.error(exception) + logger.error(exception) return broken_file @@ -298,14 +322,14 @@ def finalize(chunk_data: "Dict[str,ak.Array]", sample: "Sample", trees_to_status Args: chunk_data: A dict from tree to awkward array of data from one chunk sample: The Sample object being processed - trees_to_status_to_branches: Map for each sample from tree to various status + trees_to_status_to_branches: Map for each sample from tree to various status of branches to branches that match that status. Status can be on, off, new, req, tmp. Return: chunk_data: Same as arg, but after manipulation ''' - if len(chunk_data) == 0: return {} - + if len(chunk_data) == 0: return {} + chunk_data = add_branches(chunk_data, trees_to_status_to_branches) if sample.selec is not None: chunk_data = apply_cuts(chunk_data, sample.selec) @@ -320,33 +344,33 @@ def add_branches(trees_to_data: SampleDataType, trees_to_status_to_branches: Bra Function to create new branches that user has requested or to simply rename branches that were read from input to the name assigned to them by user. This step must be ran before the `apply_cuts` method since the user can request cuts to be applied on new branches or on the - given name to branches read from inputs. + given name to branches read from inputs. Args: trees_to_data: Map for each sample from tree to all the data read from that tree - trees_to_status_to_branches: Map for each sample from tree to various status + trees_to_status_to_branches: Map for each sample from tree to various status of branches to branches that match that status. Status can be on, off, new, req, tmp. Return - Map for each sample from tree to all the data read from that tree after filters are applied - ''' + Map for each sample from tree to all the data read from that tree after filters are applied + ''' - for tree, data in trees_to_data.items(): + for tree, data in trees_to_data.items(): new_branches = trees_to_status_to_branches[tree]['new'] - on_branches = trees_to_status_to_branches[tree]['on'] + on_branches = trees_to_status_to_branches[tree]['on'] for branch in on_branches: data[branch.write_name] = data[branch.alg] # this creates new branch, unless keys match - if branch.write_name != branch.alg: # don't drop the branch if it's name hasn't changed + if branch.write_name != branch.alg: # don't drop the branch if it's name hasn't changed data = data[[x for x in ak.fields(data) if x != branch.alg]] data = create_new_branches(data, new_branches) - trees_to_data[tree] = data - - # Loop again to compute new branches that need args from different trees. + trees_to_data[tree] = data + + # Loop again to compute new branches that need args from different trees. for tree, data in trees_to_data.items(): new_branches = trees_to_status_to_branches[tree]['new'] on_branches = trees_to_status_to_branches[tree]['on'] data = create_new_branches(data, new_branches, mix_trees=True, trees_to_data=trees_to_data) trees_to_data[tree] = data - + trees_to_data = {tree:data for tree,data in trees_to_data.items() if data is not None} return trees_to_data @@ -354,27 +378,27 @@ def add_branches(trees_to_data: SampleDataType, trees_to_status_to_branches: Bra def create_new_branches(data: "ak.Array", new_branches: List["Branch"], mix_trees: bool = False, trees_to_data: Optional[SampleDataType] = None): ''' Function to compute new branches, taking care of the case when new branches need - other new branches to be computed first to be then passed as args to the algo. + other new branches to be computed first to be then passed as args to the algo. Args: data: Awkward array of data for one tree -- can be thought of as a dict - new_branches: A list of Branch objects that need to be computed + new_branches: A list of Branch objects that need to be computed mix_trees: Flag to determine if branches to be computed from multiple trees are allowed - trees_to_data: Map for each sample from tree to all the data read from that tree + trees_to_data: Map for each sample from tree to all the data read from that tree (only needed to compute branches from multiple trees) - + Return Awkward array of data for one tree with new branches added - ''' - new_branches_from_others: List["Branch"] = [] + ''' + new_branches_from_others: List["Branch"] = [] for branch in new_branches: # if we need other new branches to compute this new branch, save it for later if len(set([br.write_name for br in new_branches]).intersection(set(branch.alg_args)))!=0: new_branches_from_others.append(branch) continue - # if branches computed from multiple trees is not allowed + # if branches computed from multiple trees is not allowed if not mix_trees: # (initial call to this method comes here) if branch.args_from is None: # check the branch doesn't need args from multiple trees - # Get args and execute the new beanch algo + # Get args and execute the new beanch algo args = [data[arg] if branch.alg_arg_types[i] == Branch else arg for i, arg in enumerate(branch.alg_args)] if branch.isprop: if len(args)>1: @@ -384,13 +408,13 @@ def create_new_branches(data: "ak.Array", new_branches: List["Branch"], mix_tree data[branch.write_name] = getattr(arg, branch.alg) else: data[branch.write_name] = branch.alg(*args) - else: + else: if branch.args_from is not None: # only care to process branches from mixed trees - args_from = branch.args_from # get tree names - # get branches from trees and execute new branch algo + args_from = branch.args_from # get tree names + # get branches from trees and execute new branch algo args = [trees_to_data[args_from[i]][arg] if branch.alg_arg_types[i] == Branch else arg for i, arg in enumerate(branch.alg_args)] data[branch.write_name] = branch.alg(*args) - # If we have anu branches that are saved for later proessing, + # If we have anu branches that are saved for later proessing, # re-run the method on them with the updated data containing new branches if(len(new_branches_from_others)!=0): try: @@ -398,7 +422,7 @@ def create_new_branches(data: "ak.Array", new_branches: List["Branch"], mix_tree except RecursionError: logger.error(f'Failed to build your branches due to recursion. Check all branch input and output names are not the same. \n \ Check {[br.write_name for br in new_branches_from_others]}') - + return data @@ -406,16 +430,16 @@ def apply_cuts(trees_to_data: SampleDataType, sel: CutFuncType)-> SampleDataType ''' Function to filter data based on sample selection cuts provided in the configuration file. The branches specified to be cut on must have names specified as `sample.write_name` -- not the name in the input, unless - both match. + both match. Args: trees_to_data: Map for each sample from tree to all the data read from that tree cuts: A function to be called with awkward arrays as args in order to filter the data - cut_args: A map for each sample from tree to the branches (or tree-independent objects like floats) + cut_args: A map for each sample from tree to the branches (or tree-independent objects like floats) needed to filter the data to be passed as arguments to the `cuts` function Return - Map for each sample from tree to all the data read from that tree after filters are applied + Map for each sample from tree to all the data read from that tree after filters are applied - ''' + ''' cut_args = sel.args cuts = sel.func for tree, args in cut_args.items(): @@ -431,22 +455,22 @@ def apply_cuts(trees_to_data: SampleDataType, sel: CutFuncType)-> SampleDataType def drop_branches(trees_to_data: SampleDataType, trees_to_status_to_branches: BranchStatusType): ''' - Function to drop branches that user did not ask to save. These can be + Function to drop branches that user did not ask to save. These can be 1. Branches saved to be used in creating new branches 2. Branches with the drop attribute activated by user in config - 3. Branches needed to apply selection cuts on them + 3. Branches needed to apply selection cuts on them Args: trees_to_data: Map for each sample from tree to all the data read from that tree - trees_to_status_to_branches: Map for each sample from tree to various status + trees_to_status_to_branches: Map for each sample from tree to various status of branches to branches that match that status. Status can be on, off, new, req, tmp. Return - Map for each sample from tree to all the data read from that tree after filters are applied - ''' + Map for each sample from tree to all the data read from that tree after filters are applied + ''' for tree, data in trees_to_data.items(): new_branches = trees_to_status_to_branches[tree]['new'] req_branches = trees_to_status_to_branches[tree]['req'] - tmp_branches = trees_to_status_to_branches[tree]['tmp'] + tmp_branches = trees_to_status_to_branches[tree]['tmp'] for branch in req_branches: # these are just branch names data = data[[x for x in ak.fields(data) if x != branch]] for branch in tmp_branches: diff --git a/src/pythium/sklimming/writer.py b/src/pythium/sklimming/writer.py index c81e218..bc45354 100644 --- a/src/pythium/sklimming/writer.py +++ b/src/pythium/sklimming/writer.py @@ -1,4 +1,4 @@ -''' +''' author: Mohamed Aly (maly@cern.ch) Brief: Functions to take in awkward arrays and set them out to be dumped to various formats @@ -21,7 +21,7 @@ def write_sample(sample_data, sample, cfg, ext='H5', suffix=''): ext = cfg['settings']['dumptoformat'] outdir.mkdir(parents=True, exist_ok=True) outfile = f"{outdir}/{sample_name}{suffix}" - if ext == 'h5': + if ext == 'h5': for tree, data in sample_data.items(): outfile += f"__{tree}.h5" with h5py.File(outfile, "w") as file: @@ -33,16 +33,17 @@ def write_sample(sample_data, sample, cfg, ext='H5', suffix=''): group.attrs["sel"] = "MySel" outfile = outfile.replace(f"__{tree}.h5", "") elif ext == 'root': - pass + pass elif ext == 'parquet': for tree, data in sample_data.items(): outfile += f"__{tree}.parquet" + data = ak.drop_none(data,axis=0) ak.to_parquet(data, outfile) - outfile = outfile.replace(f"__{tree}.parquet", "") + outfile = outfile.replace(f"__{tree}.parquet", "") elif ext == "json": for tree, data in sample_data.items(): outfile += f"__{tree}.json" ak.to_json(data, outfile) outfile = outfile.replace(f"__{tree}.json", "") - - return outfile \ No newline at end of file + + return outfile