diff --git a/MC/bin/o2dpg_sim_workflow_anchored.py b/MC/bin/o2dpg_sim_workflow_anchored.py index 9a7002bf2..31d8ffb1c 100755 --- a/MC/bin/o2dpg_sim_workflow_anchored.py +++ b/MC/bin/o2dpg_sim_workflow_anchored.py @@ -394,6 +394,29 @@ def determine_timestamp(sor, eor, splitinfo, cycle, ntf, HBF_per_timeframe = 256 assert (timestamp_of_production <= eor) return int(timestamp_of_production), production_offset +def determine_timestamp_from_timeframeID(sor, eor, timeframeID, HBF_per_timeframe = 256): + """ + Determines the timestamp based on the given timeframeID within a run + Args: + sor: int + start-of-run in milliseconds since epoch + eor: int + end-of-run in milliseconds since epoch + timeframeID: int + timeframe id + HBF_per_timeframe: int + number of orbits per timeframe + Returns: + int: timestamp in milliseconds + """ + # length of this run in micro seconds, since we use the orbit duration in micro seconds + + timestamp_of_production = sor + timeframeID * HBF_per_timeframe * LHCOrbitMUS / 1000 + # this is a closure test. If we had perfect floating point precision everywhere, it wouldn't fail. + # But since we don't have that and there are some int casts as well, better check again. + assert (timestamp_of_production >= sor) + assert (timestamp_of_production <= eor) + return int(timestamp_of_production) def exclude_timestamp(ts, orbit, run, filename, global_run_params): """ @@ -481,6 +504,7 @@ def main(): parser.add_argument("--invert-irframe-selection", action='store_true', help="Inverts the logic of --run-time-span-file") parser.add_argument("--orbitsPerTF", type=str, help="Force a certain orbits-per-timeframe number; Automatically taken from CCDB if not given.", default="") parser.add_argument('--publish-mcprodinfo', action='store_true', default=False, help="Publish MCProdInfo metadata to CCDB") + parser.add_argument('--timeframeID', type=int, help="If given, anchor to this specific timeframe id within a run. Takes precendence over determination based on (split-id, prod-split, cycle)", default=-1) parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation args = parser.parse_args() print (args) @@ -553,7 +577,13 @@ def main(): GLOparams["OrbitsPerTF"] = determined_orbits # determine timestamp, and production offset for the final MC job to run - timestamp, prod_offset = determine_timestamp(run_start, run_end, [args.split_id - 1, args.prod_split], args.cycle, args.tf, GLOparams["OrbitsPerTF"]) + timestamp = 0 + prod_offset = 0 + if args.timeframeID != -1: + timestamp = determine_timestamp_from_timeframeID(run_start, run_end, args.timeframeID, GLOparams["OrbitsPerTF"]) + prod_offset = args.timeframeID + else: + timestamp, prod_offset = determine_timestamp(run_start, run_end, [args.split_id - 1, args.prod_split], args.cycle, args.tf, GLOparams["OrbitsPerTF"]) # determine orbit corresponding to timestamp (mainly used in exclude_timestamp function) orbit = GLOparams["FirstOrbit"] + int((timestamp - GLOparams["SOR"]) / ( LHCOrbitMUS / 1000)) diff --git a/MC/run/ANCHOR/anchorMC_DataEmbedding.sh b/MC/run/ANCHOR/anchorMC_DataEmbedding.sh new file mode 100755 index 000000000..74aecfd7b --- /dev/null +++ b/MC/run/ANCHOR/anchorMC_DataEmbedding.sh @@ -0,0 +1,466 @@ +#!/bin/bash + +# add distortion maps +# https://alice.its.cern.ch/jira/browse/O2-3346?focusedCommentId=300982&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-300982 +# +# export O2DPG_ENABLE_TPC_DISTORTIONS=ON +# SCFile=$PWD/distortions_5kG_lowIR.root # file needs to be downloaded +# export O2DPG_TPC_DIGIT_EXTRA=" --distortionType 2 --readSpaceCharge ${SCFile} " + +# +# procedure setting up and executing an anchored MC +# + +######################## +# helper functionality # +######################## + +echo_info() +{ + echo "INFO [anchorMC]: ${*}" +} + +echo_error() +{ + echo "ERROR [anchorMC]: ${*}" +} + +print_help() +{ + echo "Usage: ./anchorMC.sh" + echo + echo "This needs O2 and O2DPG loaded from alienv." + echo + echo "Make sure the following env variables are set:" + echo "ALIEN_JDL_LPMANCHORPASSNAME or ANCHORPASSNAME," + echo "ALIEN_JDL_MCANCHOR or MCANCHOR," + echo "ALIEN_JDL_LPMPASSNAME or PASSNAME," + echo "ALIEN_JDL_LPMRUNNUMBER or RUNNUMBER," + echo "ALIEN_JDL_LPMPRODUCTIONTYPE or PRODUCTIONTYPE," + echo "ALIEN_JDL_LPMINTERACTIONTYPE or INTERACTIONTYPE," + echo "ALIEN_JDL_LPMPRODUCTIONTAG or PRODUCTIONTAG," + echo "ALIEN_JDL_LPMANCHORRUN or ANCHORRUN," + echo "ALIEN_JDL_LPMANCHORPRODUCTION or ANCHORPRODUCTION," + echo "ALIEN_JDL_LPMANCHORYEAR or ANCHORYEAR," + echo + echo "as well as:" + echo "NTIMEFRAMES," + echo "SPLITID," + echo "PRODSPLIT." + echo + echo "Optional are:" + echo "ALIEN_JDL_CPULIMIT or CPULIMIT, set the CPU limit of the workflow runner, default: 8," + echo "NWORKERS, set the number of workers during detector transport, default: 8," + echo "ALIEN_JDL_SIMENGINE or SIMENGINE, choose the transport engine, default: TGeant4," + echo "ALIEN_JDL_WORKFLOWDETECTORS, set detectors to be taken into account, default: not-used (take the ones from async-reco)" + echo "ALIEN_JDL_ANCHOR_SIM_OPTIONS, additional options that are passed to the workflow creation, default: -gen pythia8," + echo "ALIEN_JDL_ADDTIMESERIESINMC, run TPC time series. Default: 1, switch off by setting to 0," + echo "ALIEN_JDL_MC_ORBITS_PER_TF=N, enforce some orbits per timeframe, instead of determining from CCDB" + echo "ALIEN_JDL_RUN_TIME_SPAN_FILE=FILE, use a run-time-span file to exclude bad data-taking periods" + echo "ALIEN_JDL_INVERT_IRFRAME_SELECTION, invertes the choice of ALIEN_JDL_RUN_TIME_SPAN_FILE" + echo "ALIEN_JDL_CCDB_CONDITION_NOT_AFTER, sets the condition_not_after timestamp for CCDB queries" + echo "DISABLE_QC, set this to disable QC, e.g. to 1" + echo "CYCLE, to set a cycle number different than 0" + echo "NSIGEVENTS, to enforce a specific upper limit of events in a timeframe (not counting orbit-early) events" +} + +# Prevent the script from being soured to omit unexpected surprises when exit is used +SCRIPT_NAME="$(basename "$(test -L "$0" && readlink "$0" || echo "$0")")" +if [ "${SCRIPT_NAME}" != "$(basename ${BASH_SOURCE[0]})" ] ; then + echo_error "This script cannot not be sourced" >&2 + return 1 +fi + +while [ "$1" != "" ] ; do + case $1 in + --help|-h ) shift + print_help + exit 0 + ;; + * ) echo "Unknown argument ${1}" + exit 1 + ;; + esac +done + +# make sure O2DPG + O2 is loaded +[ ! "${O2DPG_ROOT}" ] && echo_error "This needs O2DPG loaded" && exit 1 +[ ! "${O2_ROOT}" ] && echo_error "This needs O2 loaded" && exit 1 + +# check if jq is there +which jq >/dev/null 2>&1 +[ "${?}" != "0" ] && { echo_error "jq is not found. Install or load via alienv." ; exit 1 ; } + +alien-token-info >/dev/null 2>&1 +[ "${?}" != "0" ] && { echo_error "No GRID token found, required to run." ; exit 1 ; } + +################################################################# +# Set all required variables to identify an anchored production # +################################################################# + +# Allow for both "ALIEN_JDL_LPM" as well as "KEY" + +# the only four where there is a real default for +export ALIEN_JDL_CPULIMIT=${ALIEN_JDL_CPULIMIT:-${CPULIMIT:-8}} +export ALIEN_JDL_SIMENGINE=${ALIEN_JDL_SIMENGINE:-${SIMENGINE:-TGeant4}} +# can be passed to contain additional options that will be passed to o2dpg_sim_workflow_anchored.py and eventually to o2dpg_sim_workflow.py +export ALIEN_JDL_ANCHOR_SIM_OPTIONS=${ALIEN_JDL_ANCHOR_SIM_OPTIONS:--gen pythia8} +# all others MUST be set by the user/on the outside +export ALIEN_JDL_LPMANCHORPASSNAME=${ALIEN_JDL_LPMANCHORPASSNAME:-${ANCHORPASSNAME}} +# LPMPASSNAME is used in O2 and O2DPG scripts, however on the other hand, ALIEN_JDL_LPMANCHORPASSNAME is the one that is set in JDL templates; so use ALIEN_JDL_LPMANCHORPASSNAME and set ALIEN_JDL_LPMPASSNAME +export ALIEN_JDL_LPMPASSNAME=${ALIEN_JDL_LPMANCHORPASSNAME} +export ALIEN_JDL_LPMRUNNUMBER=${ALIEN_JDL_LPMRUNNUMBER:-${RUNNUMBER}} +export ALIEN_JDL_LPMPRODUCTIONTYPE=${ALIEN_JDL_LPMPRODUCTIONTYPE:-${PRODUCTIONTYPE}} +export ALIEN_JDL_LPMINTERACTIONTYPE=${ALIEN_JDL_LPMINTERACTIONTYPE:-${INTERACTIONTYPE}} +export ALIEN_JDL_LPMPRODUCTIONTAG=${ALIEN_JDL_LPMPRODUCTIONTAG:-${PRODUCTIONTAG}} +export ALIEN_JDL_LPMANCHORRUN=${ALIEN_JDL_LPMANCHORRUN:-${ANCHORRUN}} +export ALIEN_JDL_LPMANCHORPRODUCTION=${ALIEN_JDL_LPMANCHORPRODUCTION:-${ANCHORPRODUCTION}} +export ALIEN_JDL_LPMANCHORYEAR=${ALIEN_JDL_LPMANCHORYEAR:-${ANCHORYEAR}} +# decide whether to run TPC time series; on by default, switched off by setting to 0 +export ALIEN_JDL_ADDTIMESERIESINMC=${ALIEN_JDL_ADDTIMESERIESINMC:-1} + +# check for presence of essential variables that need to be set +[ -z "${ALIEN_JDL_LPMANCHORPASSNAME}" ] && { echo_error "Set ALIEN_JDL_LPMANCHORPASSNAME or ANCHORPASSNAME" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMRUNNUMBER}" ] && { echo_error "Set ALIEN_JDL_LPMRUNNUMBER or RUNNUMBER" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMPRODUCTIONTYPE}" ] && { echo_error "Set ALIEN_JDL_LPMPRODUCTIONTYPE or PRODUCTIONTYPE" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMINTERACTIONTYPE}" ] && { echo_error "Set ALIEN_JDL_LPMINTERACTIONTYPE or INTERACTIONTYPE" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMPRODUCTIONTAG}" ] && { echo_error "Set ALIEN_JDL_LPMPRODUCTIONTAG or PRODUCTIONTAG" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMANCHORRUN}" ] && { echo_error "Set ALIEN_JDL_LPMANCHORRUN or ANCHORRUN" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMANCHORPRODUCTION}" ] && { echo_error "Set ALIEN_JDL_LPMANCHORPRODUCTION or ANCHORPRODUCTION" ; exit 1 ; } +[ -z "${ALIEN_JDL_LPMANCHORYEAR}" ] && { echo_error "Set ALIEN_JDL_LPMANCHORYEAR or ANCHORYEAR" ; exit 1 ; } + +[ -z "${NTIMEFRAMES}" ] && { echo_error "Set NTIMEFRAMES" ; exit 1 ; } +[ -z "${SPLITID}" ] && { echo_error "Set SPLITID" ; exit 1 ; } +[ -z "${PRODSPLIT}" ] && { echo_error "Set PRODSPLIT" ; exit 1 ; } + + +# cache the production tag, will be set to a special anchor tag; reset later in fact +ALIEN_JDL_LPMPRODUCTIONTAG_KEEP=$ALIEN_JDL_LPMPRODUCTIONTAG +echo_info "Substituting ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMPRODUCTIONTAG with ALIEN_JDL_LPMANCHORPRODUCTION=$ALIEN_JDL_LPMANCHORPRODUCTION for simulating reco pass..." +ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMANCHORPRODUCTION + +if [[ $ALIEN_JDL_ANCHOR_SIM_OPTIONS == *"--tpc-distortion-type 2"* ]]; then + export O2DPG_ENABLE_TPC_DISTORTIONS=ON + # set the SCALING SOURCE to CTP for MC unless explicitely given from outside + export ALIEN_JDL_TPCSCALINGSOURCE=${ALIEN_JDL_TPCSCALINGSOURCE:-"CTP"} +fi + + +# The number of signal events can be given, but should be useful only in +# certain expert modes. In the default case, the final event number is determined by the timeframe length. +if [ -z "${NSIGEVENTS}" ]; then + NSIGEVENTS=10000 # this is just some big number; In the simulation the event number is the minimum of this number and what fits into a single timeframe + # based on the interaction rate. The number is a reasonable upper limit related to ~5696 collisions that fit into 32 LHC orbits at 2MHz interaction rate. +fi + +if [ -z "${CYCLE}" ]; then + echo_info "No CYCLE number given ... defaulting to 0" + CYCLE=0 +fi + +# this generates an exact reproducer script for this job +# that can be used locally for debugging etc. +if [[ -n "${ALIEN_PROC_ID}" && -n "${JALIEN_WSPORT}" ]]; then + ${O2DPG_ROOT}/GRID/utils/getReproducerScript.sh ${ALIEN_PROC_ID} +fi + +# also for this keep a real default +NWORKERS=${NWORKERS:-8} +# set a default seed if not given +SEED=${ALIEN_PROC_ID:-${SEED:-1}} + +ONCVMFS=0 + +if [ "${ALIEN_JDL_O2DPG_OVERWRITE}" ]; then + echo "Setting O2DPG_ROOT to overwritten path ${ALIEN_JDL_O2DPG_OVERWRITE}" + export O2DPG_ROOT=${ALIEN_JDL_O2DPG_OVERWRITE} +fi + +export > env_base.env + +if ! declare -F module > /dev/null; then + module() { + eval "$(/usr/bin/modulecmd bash "$@")"; + } + export -f module +fi + +[[ "${BASEDIR}" == /cvmfs/* ]] && ONCVMFS=1 +if [ ! "${MODULEPATH}" ]; then + export MODULEPATH=${BASEDIR}/../Modules/modulefiles + if [ "${ONCVMFS}" == "1" ]; then + PLATFORM=$(echo "${BASEDIR}" | sed -E 's|.*/([^/]+)/Packages|\1|') + export MODULEPATH=${MODULEPATH}:${BASEDIR}/../../etc/toolchain/modulefiles/${PLATFORM} + fi + echo "Determined Modulepath to be ${MODULEPATH}" +fi + +#<----- START OF part that should run under a clean alternative software environment if this was given ------ +if [ "${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}" ]; then + if [ "${LOADEDMODULES}" ]; then + printenv > env_before_stashing.printenv + echo "Stashing initial modules" + module save initial_modules.list # we stash the current modules environment + module list --no-pager + module purge --no-pager + printenv > env_after_stashing.printenv + echo "Modules after purge" + module list --no-pager + fi + echo_info "Using tag ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG} to setup anchored MC" + /cvmfs/alice.cern.ch/bin/alienv printenv "${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}" &> async_environment.env + source async_environment.env + export > env_async.env +fi + +# default async_pass.sh script +DPGRECO=$O2DPG_ROOT/DATA/production/configurations/asyncReco/async_pass.sh +# default destenv_extra.sh script +DPGSETENV=$O2DPG_ROOT/DATA/production/configurations/asyncReco/setenv_extra.sh + +# a specific async_pass.sh script is in the current directory, assume that one should be used +if [[ -f async_pass.sh ]]; then + # the default is executable, however, this may not be, so make it so + chmod +x async_pass.sh + DPGRECO=./async_pass.sh +else + cp -v $DPGRECO . +fi + +# if there is no setenv_extra.sh in this directory (so no special version is "shipped" with this rpodcution), copy the default one +if [[ ! -f setenv_extra.sh ]] ; then + cp ${DPGSETENV} . + echo_info "Use default setenv_extra.sh from ${DPGSETENV}." +else + echo_info "setenv_extra.sh was found in the current working directory, use it." +fi + +chmod u+x setenv_extra.sh + +echo_info "Setting up DPGRECO to ${DPGRECO}" + +# take out line running the workflow (if we don't have data input) +[ ${CTF_TEST_FILE} ] || sed -i '/WORKFLOWMODE=run/d' async_pass.sh + +# create workflow ---> creates the file that can be parsed +export IGNORE_EXISTING_SHMFILES=1 +touch list.list + +# run the async_pass.sh and store output to log file for later inspection and extraction of information +./async_pass.sh ${CTF_TEST_FILE:-""} 2&> async_pass_log.log +RECO_RC=$? + +echo_info "async_pass.sh finished with ${RECO_RC}" + +if [[ "${RECO_RC}" != "0" ]] ; then + exit ${RECO_RC} +fi + +# check that workflowconfig.log was created correctly +if [[ ! -f workflowconfig.log ]]; then + echo "Workflowconfig.log file not found" + exit 1 +fi + +export ALIEN_JDL_LPMPRODUCTIONTAG=$ALIEN_JDL_LPMPRODUCTIONTAG_KEEP +echo_info "Setting back ALIEN_JDL_LPMPRODUCTIONTAG to $ALIEN_JDL_LPMPRODUCTIONTAG" + +# get rid of the temporary software environment +if [ "${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}" ]; then + module purge --no-pager + # restore the initial software environment + echo "Restoring initial environment" + module --no-pager restore initial_modules.list + module saverm initial_modules.list + + # Restore overwritten O2DPG variables set by modules but changed by user + # (in particular custom O2DPG_ROOT and O2DPG_MC_CONFIG_ROOT) + printenv > env_after_restore.printenv + comm -12 <(grep '^O2DPG' env_before_stashing.printenv | cut -d= -f1 | sort) \ + <(grep '^O2DPG' env_after_restore.printenv | cut -d= -f1 | sort) | + while read -r var; do + b=$(grep "^$var=" env_before_stashing.printenv | cut -d= -f2-) + a=$(grep "^$var=" env_after_restore.printenv | cut -d= -f2-) + [[ "$b" != "$a" ]] && export "$var=$b" && echo "Reapplied: $var to ${b}" + done +fi +#<----- END OF part that should run under a clean alternative software environment if this was given ------ + +# now create the local MC config file --> config-json.json +# we create the new config output with blacklist functionality +ASYNC_CONFIG_BLACKLIST=${ASYNC_CONFIG_BLACKLIST:-${O2DPG_ROOT}/MC/run/ANCHOR/anchor-dpl-options-blacklist.json} +${O2DPG_ROOT}/MC/bin/o2dpg_dpl_config_tools.py workflowconfig.log ${ASYNC_CONFIG_BLACKLIST} config-json.json +ASYNC_WF_RC=${?} + +# check if config reasonably created +if [[ "${ASYNC_WF_RC}" != "0" || `grep "ConfigParams" config-json.json 2> /dev/null | wc -l` == "0" ]]; then + echo_error "Problem in anchor config creation. Exiting." + exit 1 +fi + +# -- CREATE THE MC JOB DESCRIPTION ANCHORED TO RUN -- + +MODULES="--skipModules ZDC" + +# publish MCPRODINFO for first few jobs of a production +# if external script exported PUBLISH_MCPRODINFO, it will be published anyways +if [ -z "$PUBLISH_MCPRODINFO" ] && [ "$SPLITID" -lt 20 ]; then + PUBLISH_MCPRODINFO_OPTION="--publish-mcprodinfo" + echo "Will publish MCProdInfo" +else + echo "Will not publish MCProdInfo" +fi +PUBLISH_MCPRODINFO_OPTION="" + +# let's take the input data AO2D from a JDL variable +AOD_DATA_FILE=${ALIEN_JDL_MC_DATA_EMBEDDING_AO2D} +if [ "${ALIEN_JDL_MC_DATA_EMBEDDING_AO2D}" ]; then + NTIMEFRAMES=1 + NWORKERS=1 # these embedding jobs process only light pp type signals ... the parallelism will come via parallel workflows +fi + +# call the python script to extract all collision contexts +python3 ${O2DPG_ROOT}/MC/utils/o2dpg_data_embedding_utils.py --aod-file ${AOD_DATA_FILE} --run-number ${ALIEN_JDL_LPMRUNNUMBER} --limit ${DATA_EMBEDDING_LIMIT:-8} + +parallel_job_count=0 +failed_count=0 +for external_context in collission_context_*.root; do + echo "Embedding into ${external_context}" + # extract timeframe from name + anchoring_tf="${external_context#collission_context_}" # remove prefix 'collision_context_' + anchoring_tf="${anchoring_tf%.root}" # remove suffix '.root' + echo "Treating timeframe ${anchoring_tf}" + + # we do it in a separate workspace + workspace="TF_${anchoring_tf}" + mkdir "${workspace}"; cd "${workspace}" + # fetch the apass reco anchoring config + cp ../*.json . + + # we need to adjust the SEED for each job + JOBSEED=$SEED + [ "$JOBSEED" != "-1" ] && let JOBSEED=JOBSEED+anchoring_tf + echo "TF ${anchoring_tf} got seed ${JOBSEED}" + + # these arguments will be digested by o2dpg_sim_workflow_anchored.py + anchoringArgs="--split-id ${SPLITID} --prod-split ${PRODSPLIT} --cycle ${CYCLE}" + if [ "${ALIEN_JDL_MC_DATA_EMBEDDING_AO2D}" ]; then + anchoringArgs="--timeframeID ${anchoring_tf}" + fi + baseargs="-tf ${NTIMEFRAMES} ${anchoringArgs} --run-number ${ALIEN_JDL_LPMRUNNUMBER} \ + ${ALIEN_JDL_RUN_TIME_SPAN_FILE:+--run-time-span-file ${ALIEN_JDL_RUN_TIME_SPAN_FILE} ${ALIEN_JDL_INVERT_IRFRAME_SELECTION:+--invert-irframe-selection}} \ + ${ALIEN_JDL_MC_ORBITS_PER_TF:+--orbitsPerTF ${ALIEN_JDL_MC_ORBITS_PER_TF}} ${PUBLISH_MCPRODINFO_OPTION}" + + # these arguments will be passed as well but only eventually be digested by o2dpg_sim_workflow.py which is called from o2dpg_sim_workflow_anchored.py + remainingargs="-seed ${JOBSEED} -ns ${NSIGEVENTS} --include-local-qc --pregenCollContext" + remainingargs="${remainingargs} -e ${ALIEN_JDL_SIMENGINE} -j ${NWORKERS}" + remainingargs="${remainingargs} -productionTag ${ALIEN_JDL_LPMPRODUCTIONTAG:-alibi_anchorTest_tmp}" + # prepend(!) ALIEN_JDL_ANCHOR_SIM_OPTIONS + # since the last passed argument wins, e.g. -productionTag cannot be overwritten by the user + remainingargs="${ALIEN_JDL_ANCHOR_SIM_OPTIONS} ${remainingargs} --anchor-config config-json.json" + # apply software tagging choice + # remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG}}" + ALIEN_JDL_O2DPG_ASYNC_RECO_FROMSTAGE=${ALIEN_JDL_O2DPG_ASYNC_RECO_FROMSTAGE:-RECO} + remainingargs="${remainingargs} ${ALIEN_JDL_O2DPG_ASYNC_RECO_TAG:+--alternative-reco-software ${PWD}/env_async.env@${ALIEN_JDL_O2DPG_ASYNC_RECO_FROMSTAGE}}" + # potentially add CCDB timemachine timestamp + remainingargs="${remainingargs} ${ALIEN_JDL_CCDB_CONDITION_NOT_AFTER:+--condition-not-after ${ALIEN_JDL_CCDB_CONDITION_NOT_AFTER}}" + # add external collision context injection + if [ "${ALIEN_JDL_MC_DATA_EMBEDDING_AO2D}" ]; then + remainingargs="${remainingargs} --data-anchoring ${PWD}/../${external_context}" + fi + + echo_info "baseargs passed to o2dpg_sim_workflow_anchored.py: ${baseargs}" + echo_info "remainingargs forwarded to o2dpg_sim_workflow.py: ${remainingargs}" + + anchoringLogFile=timestampsampling_${ALIEN_JDL_LPMRUNNUMBER}.log + # query CCDB has changed, w/o "_" + ${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow_anchored.py ${baseargs} -- ${remainingargs} &> ${anchoringLogFile} + WF_RC="${?}" + if [ "${WF_RC}" != "0" ] ; then + echo_error "Problem during anchor timestamp sampling and workflow creation. Exiting." + exit ${WF_RC} + fi + + TIMESTAMP=`grep "Determined timestamp to be" ${anchoringLogFile} | awk '//{print $6}'` + echo_info "TIMESTAMP IS ${TIMESTAMP}" + + if [ "${ONLY_WORKFLOW_CREATION}" ]; then + continue # or exit + fi + + # check if this job is exluded because it falls inside a bad data-taking period + ISEXCLUDED=$(grep "TIMESTAMP IS EXCLUDED IN RUN" ${anchoringLogFile}) + if [ "${ISEXCLUDED}" ]; then + # we can quit here; there is nothing to do + # (apart from maybe creating a fake empty AO2D.root file or the like) + echo "Timestamp is excluded from run. Nothing to do here" + continue # or exit 0 + fi + + # -- RUN THE MC WORKLOAD TO PRODUCE TARGETS -- + + export FAIRMQ_IPC_PREFIX=./ + echo_info "Ready to start main workflow" + + # Let us construct the workflow targets + targetString="" + if [ "${ALIEN_JDL_O2DPGWORKFLOWTARGET}" ]; then + # The user gave ${ALIEN_JDL_O2DPGWORKFLOWTARGET}. This is an expert mode not used in production. + # In this case, we will build just that. No QC, no TPC timeseries, ... + targetString=${ALIEN_JDL_O2DPGWORKFLOWTARGET} + else + targetString="'aodmerge.*'" + # Now add more targets depending on options + # -) The TPC timeseries targets + if [[ "${ALIEN_JDL_ADDTIMESERIESINMC}" == "1" ]]; then + targetString="${targetString} 'tpctimes.*'" + fi + # -) TPC residual calibration + if [ "${ALIEN_JDL_DOTPCRESIDUALSEXTRACTION}" ]; then + targetString="${targetString} 'tpcresidmerge.*'" + fi + # -) QC tasks + if [[ -z "${DISABLE_QC}" && "${remainingargs}" == *"--include-local-qc"* ]]; then + targetString="${targetString} '^.*QC.*'" # QC tasks should have QC in the name + fi + fi + echo_info "Workflow will run with target specification ${targetString}" + + ${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt ${targetString} -j 1 \ + --cpu-limit ${ALIEN_JDL_CPULIMIT:-8} --dynamic-resources \ + ${ALIEN_O2DPG_FILEGRAPH:+--remove-files-early ${ALIEN_O2DPG_FILEGRAPH}} \ + ${ALIEN_O2DPG_ADDITIONAL_WORKFLOW_RUNNER_ARGS} &> pipeline_log & + + ((parallel_job_count++)) + # If limit reached, wait for one job to finish + if ((parallel_job_count >= 8)); then + if ! wait -n; then + ((failed_count++)) + fi + ((parallel_job_count--)) + fi + + cd .. +done # done outer loop +while ((parallel_job_count > 0)); do + if ! wait -n; then + ((failed_count++)) + fi + ((parallel_job_count--)) +done + +if [ "${ALIEN_JDL_MC_DATA_EMBEDDING_AO2D}" ]; then + # produce the final merged AO2D + find ./ -maxdepth 2 -mindepth 2 -name "AO2D.root" > aod_inputs.txt + o2-aod-merger --input aod_inputs.txt --output AO2D.root +fi + +# +# full logs tar-ed for output, regardless the error code or validation - to catch also QC logs... +# +if [[ -n "$ALIEN_PROC_ID" ]]; then + find ./ \( -name "*.log*" -o -name "*mergerlog*" -o -name "*serverlog*" -o -name "*workerlog*" -o -name "pythia8.cfg" -o -name "reproducer*.sh" \) | tar -czvf debug_log_archive.tgz -T - + if [[ "$ALIEN_JDL_CREATE_TAR_IN_MC" == "1" ]]; then + find ./ \( -name "*.log*" -o -name "*mergerlog*" -o -name "*serverlog*" -o -name "*workerlog*" -o -name "*.root" \) | tar -czvf debug_full_archive.tgz -T - + fi +fi diff --git a/MC/run/ANCHOR/tests/data_embedding/run.sh b/MC/run/ANCHOR/tests/data_embedding/run.sh new file mode 100755 index 000000000..f11cf822e --- /dev/null +++ b/MC/run/ANCHOR/tests/data_embedding/run.sh @@ -0,0 +1,30 @@ +#!/bin/bash +# +# Simple test for Data anchoring +# + +export ALIEN_JDL_LPMANCHORPASSNAME=apass2 +export ALIEN_JDL_MCANCHOR=apass2 +export ALIEN_JDL_CPULIMIT=8 +export ALIEN_JDL_LPMRUNNUMBER=544742 +export ALIEN_JDL_LPMPRODUCTIONTYPE=MC +export ALIEN_JDL_LPMINTERACTIONTYPE=pp +export ALIEN_JDL_LPMPRODUCTIONTAG=LHC24a2 +export ALIEN_JDL_LPMANCHORRUN=544742 +export ALIEN_JDL_LPMANCHORPRODUCTION=LHC23f +export ALIEN_JDL_LPMANCHORYEAR=2023 + +# need to give a data AOD +export ALIEN_JDL_MC_DATA_EMBEDDING_AO2D="alien:///alice/data/2023/LHC23zzm/544742/apass5/0000/o2_ctf_run00544742_orbit0137377824_tf0002239365_epn262/002/AO2D.root" + +export NTIMEFRAMES=1 +export NWORKERS=1 +export SPLITID=1 +export PRODSPLIT=1 +export ALIEN_JDL_O2DPGWORKFLOWTARGET=aod + +# setup of the signal generator +export ALIEN_JDL_ANCHOR_SIM_OPTIONS="-gen pythia8pp" + +export DISABLE_QC=1 +${O2DPG_ROOT}/MC/run/ANCHOR/anchorMC_DataEmbedding.sh diff --git a/MC/utils/o2dpg_data_embedding_utils.py b/MC/utils/o2dpg_data_embedding_utils.py new file mode 100644 index 000000000..dad1769a9 --- /dev/null +++ b/MC/utils/o2dpg_data_embedding_utils.py @@ -0,0 +1,333 @@ +# Set of python modules/util functions for the MC-to-DATA embedding +# Mostly concerning extraction of MC collision context from existing data AO2D.root + +import ROOT +import uproot +import pandas as pd +import re +from ROOT import o2 # for CCDB +import argparse +import sys + +class lhc_constants: + LHCMaxBunches = 3564 # max N bunches + LHCRFFreq = 400.789e6 # LHC RF frequency in Hz + LHCBunchSpacingNS = 10 * 1.e9 / LHCRFFreq # bunch spacing in ns (10 RFbuckets) + LHCOrbitNS = LHCMaxBunches * LHCBunchSpacingNS # orbit duration in ns + LHCRevFreq = 1.e9 / LHCOrbitNS # revolution frequency + LHCBunchSpacingMUS = LHCBunchSpacingNS * 1e-3 # bunch spacing in \mus (10 RFbuckets) + LHCOrbitMUS = LHCOrbitNS * 1e-3 + +def thin_AO2D_file(input_file): + """ + A function to thin an existing AO2D file by just keeping a single DF_ folder + """ + + # Open the input ROOT file + infile = ROOT.TFile.Open(input_file, "READ") + + # Find the first TDirectory starting with "DF_" + df_dir = None + dir_name = "" + for key in infile.GetListOfKeys(): + name = key.GetName() + if name.startswith("DF_"): + # Access the TDirectory + df_dir = infile.Get(name) + dir_name = name + break + + if not df_dir: + raise RuntimeError("No TDirectory starting with 'DF_' found.") + + # Open the output file (create if not exist) + output_file = "AO2D_reduced_" + str(dir_name) + ".root" + outfile = ROOT.TFile.Open(output_file, "RECREATE") + + # Create the same directory structure in the output file + df_dir_copy = outfile.mkdir(dir_name) + + # Move to the newly created directory + df_dir_copy.cd() + + # Loop over the keys (trees) inside the "DF_" directory and copy them + for key in df_dir.GetListOfKeys(): + obj = df_dir.Get(key.GetName()) + if isinstance(obj, ROOT.TTree): # Check if it's a TTree + # Clone the tree and write it to the corresponding directory in the output file + obj.CloneTree(-1).Write(key.GetName(), ROOT.TObject.kOverwrite) # Copy the tree + + # Now handle the metaData;1 key (TMap) in the top-level directory + meta_data = infile.Get("metaData") + if meta_data: + if isinstance(meta_data, ROOT.TMap): + copied_meta_data = meta_data.Clone() + outfile.cd() # Make sure we're at the top-level in the output file + outfile.WriteObject(meta_data, "metaData") + + # Iterate over the map + iter = meta_data.MakeIterator() + entry = iter.Next() + while entry: + key = entry + value = meta_data.GetValue(key) + + # Convert TObjString to Python string + key_str = key.GetName() + value_str = value.GetName() if value else "None" + print(f"{key_str}: {value_str}") + entry = iter.Next() + + # Close the files + outfile.Close() + infile.Close() + + print(f"Copied all trees from TDirectory '{dir_name}' to '{output_file}'.") + + +def retrieve_Aggregated_RunInfos(run_number): + """ + Retrieves the aggregated runinfo object ... augmented with the number of timeframes + """ + runInfo = o2.parameters.AggregatedRunInfo.buildAggregatedRunInfo(o2.ccdb.BasicCCDBManager.instance(), run_number) + detList = o2.detectors.DetID.getNames(runInfo.grpECS.getDetsReadOut()) + assert (run_number == runInfo.runNumber) + assert (run_number == runInfo.grpECS.getRun()) + + run_info = {"SOR" : runInfo.sor, + "EOR" : runInfo.eor, + "FirstOrbit" : runInfo.orbitSOR, + "LastOrbit" : runInfo.orbitEOR, + "OrbitReset" : runInfo.orbitReset, + "OrbitsPerTF" : int(runInfo.orbitsPerTF), + "detList" : detList} + + # update num of timeframes + # figure out how many timeframes fit into this run range + # take the number of orbits per timeframe and multiply by orbit duration to calculate how many timeframes fit into this run + time_length_inmus = 1000 * (run_info["EOR"] - run_info["SOR"]) + ntimeframes = time_length_inmus / (run_info["OrbitsPerTF"] * lhc_constants.LHCOrbitMUS) + run_info["ntimeframes"] = ntimeframes + + return run_info + + +def get_bc_with_timestamps(bc_data, run_info): + """ + bc_data is a pandas df containing the AO2D basic bunch crossing data. + Returns the bc table with additional information on timeframeID etc. + """ + + # add a new column to the bc table dynamically + # this is the time in mu s + bc_data["timestamp"] = run_info["OrbitReset"] + (bc_data["fGlobalBC"] * lhc_constants.LHCBunchSpacingMUS).astype("int64") + bc_data["timeframeID"] = ((bc_data["fGlobalBC"] - (run_info["FirstOrbit"] * lhc_constants.LHCMaxBunches)) / (lhc_constants.LHCMaxBunches * run_info["OrbitsPerTF"])).astype("int64") + bc_data["orbit"] = (bc_data["fGlobalBC"] // lhc_constants.LHCMaxBunches).astype("int64") + bc_data["bc_within_orbit"] = (bc_data["fGlobalBC"] % lhc_constants.LHCMaxBunches).astype("int64") + return bc_data + + +def get_timeframe_structure(filepath, run_info, max_folders=1, include_dataframe = False, folder_filter=None): + """ + run_info: The aggregated run_info object for this run + """ + def find_tree_key(keys, pattern): + for key in keys: + key_clean = key + if re.search(pattern, key_clean, re.IGNORECASE): + return key_clean + return None + + file = uproot.open(filepath) + raw_keys = file.keys() + + folders = { k.split("/")[0] : 1 for k in raw_keys if "O2bc_001" in k } + folders = [ k for k in folders.keys() ] + folders = folders[:max_folders] + + print ("have ", len(raw_keys), f" in file {filepath}") + + merged = {} # data containers per file + for folder in folders: + if folder_filter != None and folder != folder_filter: + continue + #print (f"Looking into {folder}") + + # Find correct table names using regex + bc_key = find_tree_key(raw_keys, f"^{folder}/O2bc_001") + bc_data = file[bc_key].arrays(library="pd") + + # collision data + coll_key = find_tree_key(raw_keys, f"^{folder}/O2coll.*_001") + coll_data = file[coll_key].arrays(library="pd") + + # extend the data + bc_data = get_bc_with_timestamps(bc_data, run_info) + + # do the splice with collision data + bc_data_coll = bc_data.iloc[coll_data["fIndexBCs"]].reset_index(drop=True) + # this is the combined table containing collision data associated to bc and time information + combined = pd.concat([bc_data_coll, coll_data], axis = 1) + + # do the actual timeframe structure calculation; we only take collisions with a trigger decision attached + triggered = combined[combined["fTriggerMask"] != 0] + timeframe_structure = triggered.groupby('timeframeID').apply( + lambda g: list(zip(g['fGlobalBC'], g['fPosX'], g['fPosY'], g['fPosZ'], g['orbit'], g['bc_within_orbit'], g['fCollisionTime'])) + ).reset_index(name='position_vectors') + + folderkey = folder + '@' + filepath + merged[folderkey] = timeframe_structure # data per folder + if include_dataframe: + merged["data"] = combined + + # annotate which timeframes are available here and from which file + return merged + + +def fetch_bccoll_to_localFile(alien_file, local_filename): + """ + A function to remotely talk to a ROOT file ... and fetching only + BC and collision tables for minimal network transfer. Creates a ROOT file locally + of name local_filename. + + Returns True if success, otherwise False + """ + + # make sure we have a TGrid connection + # Connect to AliEn grid + if not ROOT.gGrid: + ROOT.TGrid.Connect("alien://") + + if not ROOT.gGrid: + print (f"Not TGrid object found ... aborting") + return False + + # Open the remote file via AliEn + infile = ROOT.TFile.Open(alien_file, "READ") + if not infile or infile.IsZombie(): + raise RuntimeError(f"Failed to open {alien_file}") + return False + + # Output local file + outfile = ROOT.TFile.Open(local_filename, "RECREATE") + + # List of trees to copy + trees_to_copy = ["O2bc_001", "O2collision_001"] + + # Loop over top-level keys to find DF_ folders + for key in infile.GetListOfKeys(): + obj = key.ReadObj() + if obj.InheritsFrom("TDirectory") and key.GetName().startswith("DF_"): + df_name = key.GetName() + df_dir = infile.Get(df_name) + + # Create corresponding folder in output file + out_df_dir = outfile.mkdir(df_name) + out_df_dir.cd() + + # Copy only specified trees if they exist + for tree_name in trees_to_copy: + if df_dir.GetListOfKeys().FindObject(tree_name): + tree = df_dir.Get(tree_name) + cloned_tree = tree.CloneTree(-1) # copy all entries + cloned_tree.Write(tree_name) + + outfile.cd() # go back to top-level for next DF_ + + # Close files + outfile.Close() + infile.Close() + return True + + +def convert_to_digicontext(aod_timeframe=None, timeframeID=-1): + """ + converts AOD collision information from AO2D to collision context + which can be used for MC + """ + # we create the digitization context object + digicontext=o2.steer.DigitizationContext() + + # we can fill this container + parts = digicontext.getEventParts() + # we can fill this container + records = digicontext.getEventRecords() + # copy over information + maxParts = 1 + + entry = 0 + vertices = ROOT.std.vector("o2::math_utils::Point3D")() + vertices.resize(len(aod_timeframe)) + + colindex = 0 + for colindex, col in enumerate(aod_timeframe): + # we make an event interaction record + pvector = ROOT.std.vector("o2::steer::EventPart")() + pvector.push_back(o2.steer.EventPart(0, colindex)) + parts.push_back(pvector) + + orbit = col[4] + bc_within_orbit = col[5] + interaction_rec = o2.InteractionRecord(bc_within_orbit, orbit) + col_time_relative_to_bc = col[6] # in NS + time_interaction_rec = o2.InteractionTimeRecord(interaction_rec, col_time_relative_to_bc) + records.push_back(time_interaction_rec) + vertices[colindex].SetX(col[1]) + vertices[colindex].SetY(col[2]) + vertices[colindex].SetZ(col[3]) + + digicontext.setInteractionVertices(vertices) + digicontext.setNCollisions(vertices.size()) + digicontext.setMaxNumberParts(maxParts) + + # set the bunch filling ---> NEED to fetch it from CCDB + # digicontext.setBunchFilling(bunchFillings[0]); + + prefixes = ROOT.std.vector("std::string")(); + prefixes.push_back("sgn") + + digicontext.setSimPrefixes(prefixes); + digicontext.printCollisionSummary(); + digicontext.saveToFile(f"collission_context_{timeframeID}.root") + + +def process_data_AO2D(file_name, run_number, upper_limit = -1): + """ + Creates all the collision contexts + """ + timeframe_data = [] + + local_filename = "local.root" + fetch_bccoll_to_localFile(file_name, local_filename) + + # fetch run_info object + run_info = retrieve_Aggregated_RunInfos(run_number) + merged = get_timeframe_structure(local_filename, run_info, max_folders=1000) + print ("Got " + str(len(merged)) + " datasets") + timeframe_data.append(merged) + + counter = 0 + for d in timeframe_data: + for key in d: + result = d[key] + for index, row in result.iterrows(): + if upper_limit >= 0 and counter >= upper_limit: + break + tf = row['timeframeID'] + cols = row['position_vectors'] + convert_to_digicontext(cols, tf) + counter = counter + 1 + + +def main(): + parser = argparse.ArgumentParser(description='Extracts collision contexts from reconstructed AO2D') + + parser.add_argument("--run-number", type=int, help="Run number to anchor to", required=True) + parser.add_argument("--aod-file", type=str, help="Data AO2D file (can be on AliEn)", required=True) + parser.add_argument("--limit", type=int, default=-1, help="Upper limit of timeframes to be extracted") + args = parser.parse_args() + + process_data_AO2D(args.aod_file, args.run_number, args.limit) + +if __name__ == "__main__": + sys.exit(main())