Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
1d55c4a
software veto plugins
cfuselli Feb 16, 2023
f107939
adding software veto aqmon intervals
cfuselli Feb 16, 2023
6a54996
fixed, now works
cfuselli Feb 16, 2023
54d48e9
aah, it was me
cfuselli Feb 16, 2023
4001eae
inverted mask..
cfuselli Feb 16, 2023
d7bce26
fixed veto proximity
cfuselli Feb 16, 2023
200ab76
base plugin
cfuselli Feb 17, 2023
dd92b53
edit init
cfuselli Feb 17, 2023
cfbb972
modified plugin structure
cfuselli Feb 22, 2023
aa48e42
modified plugin structures
cfuselli Feb 22, 2023
0c0ffc0
refactor
cfuselli Feb 23, 2023
f35f576
add prescaling option
cfuselli Feb 23, 2023
1119aab
url config
cfuselli Feb 23, 2023
af1746a
allow for both peaks and event cuts
cfuselli Feb 23, 2023
47c1545
change deps, thx joran
cfuselli Feb 23, 2023
057f114
Merge pull request #1 from cfuselli/software_veto
cfuselli Mar 3, 2023
e81f6a1
start work on bootstrax
cfuselli Mar 5, 2023
7973120
hack you
cfuselli Mar 5, 2023
0cdf70d
fix indeces
cfuselli Mar 5, 2023
d924062
Merge pull request #3 from cfuselli - Remove hardcoded data_types in …
cfuselli Mar 5, 2023
ce9af24
start working on bootstrax
cfuselli Mar 5, 2023
8cdc7c4
Merge branch 'daq_sv' of github.com:cfuselli/straxen into daq_sv
cfuselli Mar 5, 2023
a81a92b
lot of fixing stuff
cfuselli Mar 6, 2023
b617131
fix temp fly plugin
cfuselli Mar 6, 2023
0fb2d17
forgot remove print
cfuselli Mar 6, 2023
5170710
workaround for picklable classes, works
cfuselli Mar 6, 2023
4702748
gave up, classes in a file
cfuselli Mar 6, 2023
f06dc6f
fix copies
cfuselli Mar 6, 2023
3d213bb
fix copies
cfuselli Mar 6, 2023
bc239aa
fix fix fix
cfuselli Mar 6, 2023
51c4a83
dirty but maybe works
cfuselli Mar 7, 2023
9d959aa
dirty but maybe works
cfuselli Mar 7, 2023
6da9bef
fix data kind
cfuselli Mar 7, 2023
1fa442d
fix infer_dtype, it worksgit add . !
cfuselli Mar 7, 2023
94efe07
remove extra lines
cfuselli Mar 7, 2023
12e4566
fix and add straxer compatibility
cfuselli Mar 7, 2023
756a688
fix plugin repetition
cfuselli Mar 7, 2023
8254aba
final adjustments plugins
cfuselli Mar 8, 2023
8601ec5
daq worflow in progress
cfuselli Mar 8, 2023
f84923c
pre test commit
cfuselli Mar 8, 2023
4978129
soome bugs, now works
cfuselli Mar 8, 2023
cd2db90
clean functions thx joran
cfuselli Mar 8, 2023
aa497e4
change channel
cfuselli Mar 8, 2023
6ecb059
revert temporary
cfuselli Mar 8, 2023
7e79a14
change channel
cfuselli Mar 12, 2023
eabc99b
testing workflow
cfuselli Mar 13, 2023
4b4722f
add dummyveto, refactor restrax
cfuselli Mar 14, 2023
dfb34ce
refactor restrax sv
cfuselli Mar 16, 2023
3c9fc82
change software veto docs
cfuselli Mar 17, 2023
9ac9d77
bootstrax now fantastic, accept rd
cfuselli Mar 17, 2023
8a211eb
bootstrax fail nv_sv
cfuselli Mar 17, 2023
3d3ddd9
fixed restrax.. badly
cfuselli Mar 26, 2023
a5dabe0
remove carlo paths
cfuselli Mar 26, 2023
b9fb24d
bug daqreader
cfuselli Mar 26, 2023
53ffbd7
updates for testing
cfuselli Mar 31, 2023
4174b59
fix test
cfuselli Mar 31, 2023
77cfb9f
add bootstrax profile mem
cfuselli Mar 31, 2023
5a6b3a1
add bootstrax profile mem
cfuselli Mar 31, 2023
2f98996
can t do it
cfuselli Mar 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ custom_data
test_input_data
*.zip
last_bootstrax_exception.txt
bootstrax_exceptions

# cProfile output
*.prof
Expand Down
110 changes: 101 additions & 9 deletions bin/bootstrax
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ How to use
For more info, see the documentation:
https://straxen.readthedocs.io/en/latest/bootstrax.html
"""
__version__ = '2.0.0'
__version__ = '3.0.0'

import argparse
import typing
from datetime import datetime, timedelta, timezone
import logging
import multiprocessing
import multiprocess
import npshmex
import os
import os.path as osp
Expand All @@ -39,6 +40,7 @@ import straxen
import threading
import pandas as pd
import typing as ty
from immutabledict import immutabledict
import daqnt
import fnmatch
from glob import glob
Expand Down Expand Up @@ -86,7 +88,12 @@ parser.add_argument(
parser.add_argument(
'--max_messages', type=int, default=10,
help="number of max mailbox messages")

parser.add_argument(
'--software_veto_overwrite', default=None,
help="Class name of veto plugin to apply (i.e.: RadialVeto). It's overwritten by rundoc value.")
parser.add_argument(
'--test_input_folder', default=None,
help="Add to storage")

actions = parser.add_mutually_exclusive_group()
actions.add_argument(
Expand All @@ -109,8 +116,8 @@ args = parser.parse_args()

# The folder that can be used for testing bootstrax (i.e. non production
# mode). It will be written to:
test_data_folder = ('/data/test_processed/' if
os.path.exists('/data/test_processed/')
test_data_folder = ('/data/test_pre_processed/' if
os.path.exists('/data/test_pre_processed/')
else './bootstrax/')

# Timeouts in seconds
Expand Down Expand Up @@ -270,6 +277,9 @@ if os.access(output_folder, os.W_OK) is not True:
raise IOError(message)





def new_context(cores=args.cores,
max_messages=args.max_messages,
timeout=500,
Expand All @@ -293,8 +303,11 @@ def new_context(cores=args.cores,
# all other storage frontends except fo the test folder.
context.storage = [context.storage[0],
strax.DataDirectory(output_folder)]
if args.test_input_folder is not None:
context.storage += [strax.DataDirectory(args.test_input_folder, readonly=True)]
context.storage[0].readonly = True
context.storage[0].local_only = True

return context


Expand All @@ -316,6 +329,7 @@ if not args.undying:


def main():

if args.cores == -1:
# Use all of the available cores on this machine
args.cores = multiprocessing.cpu_count()
Expand Down Expand Up @@ -505,7 +519,7 @@ def keep_target(targets, compare_with, n_fails):
return kept_targets


def infer_target(rd: dict) -> dict:
def infer_target(rd: dict, software_veto_on: bool = False) -> dict:
"""
Check if the target should be overridden based on the mode of the DAQ for this run
:param rd: rundoc
Expand All @@ -514,6 +528,20 @@ def infer_target(rd: dict) -> dict:
targets = args.targets.copy()
post_process = args.post_process.copy()

if software_veto_on:
# TODO study which targets_sv is better to have in targets or post
# TODO THIS is bad!!
_targets = list(targets)
_targets.append('raw_records_sv')
_targets.append('peaklets_sv')

_post = list(post_process)
_post.append('event_info_sv')

targets = tuple(_targets)
post_process = tuple(_post)


if args.fix_target:
return {'targets': strax.to_str_tuple(targets),
'post_processing': strax.to_str_tuple(post_process)}
Expand Down Expand Up @@ -579,6 +607,8 @@ def infer_target(rd: dict) -> dict:
f'processing up to {targets} and postprocessing '
f'to {post_process}')



if targets is None or not len(targets):
targets = 'raw_records'
if post_process is None or not len(post_process):
Expand Down Expand Up @@ -1169,7 +1199,7 @@ def manual_fail(*, mongo_id=None, number=None, reason=''):
def run_strax(run_id, input_dir, targets, readout_threads, compressor,
run_start_time, samples_per_record, cores, max_messages, timeout,
daq_chunk_duration, daq_overlap_chunk_duration, post_processing,
records_compressor, debug=False):
records_compressor, software_veto_name=None, debug=False):
# Check mongo connection
ping_dbs()
# Clear the swap memory used by npshmmex
Expand All @@ -1193,9 +1223,19 @@ def run_strax(run_id, input_dir, targets, readout_threads, compressor,
timeout=timeout,
)

if software_veto_name is not None:
# If the software veto is on, let's register the correct extra plugins
st = software_veto_register_plugins(st, software_veto_name)

for t in ('raw_records', 'records', 'records_nv', 'hitlets_nv'):
# Set the (raw)records processor to the inferred one
st._plugin_class_registry[t].compressor = records_compressor
try:
st._plugin_class_registry[t+'_sv'].compressor = records_compressor
except:
pass



# Make a function for running strax, call the function to process the run
# This way, it can also be run inside a wrapper to profile strax
Expand Down Expand Up @@ -1307,6 +1347,9 @@ def process_run(rd, send_heartbeats=args.production):
# or use the test-dir:
if not osp.exists(loc):
loc = os.path.join('/live_data/xenonnt_bootstrax_test/', run_id)
# or maybe it's in the /data dir:
if not osp.exists(loc):
loc = os.path.join('/data/xenonnt_bootstrax_test/', run_id)

else:
for dd in rd['data']:
Expand Down Expand Up @@ -1369,10 +1412,14 @@ def process_run(rd, send_heartbeats=args.production):
except Exception as e:
fail(f"Could not find start in datetime.datetime object: {str(e)}")

run_strax_config.update(infer_target(rd))
software_veto_on = software_veto_get_name(rd, args)['software_veto_name'] is not None
run_strax_config.update(infer_target(rd, software_veto_on))
run_strax_config.update(infer_mode(rd))
run_strax_config.update(software_veto_get_name(rd, args))
run_strax_config['debug'] = args.debug
strax_proc = multiprocessing.Process(

log.info(f"Processing run!")
strax_proc = multiprocess.Process(
target=run_strax,
kwargs=run_strax_config)

Expand Down Expand Up @@ -1642,7 +1689,52 @@ def cleanup_db():
abandon(mongo_id=rd['_id'])




###########################
# Software veto functions #
###########################



def software_veto_get_name(rd, args=None):
"""Decide if software veto is on based on arguments or rundoc"""
if args is not None:
if args.software_veto_overwrite is not None:
return {'software_veto_name': args.software_veto_overwrite}

return {'software_veto_name': rd.get('software_veto', None)}


def software_veto_register_plugins(st, veto_name):
"""Based on the selection that we want to apply,
register the correct veto plugin.
Probably we want to pass it as argument and if possible overwrite it from rundoc.
Then register a copy of all the plugins.
"""

import straxen.plugins.raw_records_sv.software_veto as software_veto
raw_records_sv_plugin = getattr(software_veto, veto_name)
st.register(raw_records_sv_plugin)

import straxen.plugins.raw_records_sv._software_veto_copies as _software_veto_copies
st.register_all(_software_veto_copies)

return st




###########################
# __MAIN__ #
###########################

if __name__ == '__main__':

# to avoid warnings pymongo fork
multiprocessing.set_start_method('spawn')
multiprocess.set_start_method('spawn')

if not args.undying:
main()
else:
Expand All @@ -1653,7 +1745,7 @@ if __name__ == '__main__':
raise
except Exception as fatal_error:
log.error(f'Fatal warning:\tran into {fatal_error}. Try '
f'logging error and restart bootstrax')
f'logging error and restart bootstrax')
try:
log_warning(f'Fatal warning:\tran into {fatal_error}',
priority='error')
Expand Down
Loading