Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rule to build dsp pars for SiPM data #98

Merged
merged 23 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
- name: Set LEGEND_METADATA variable
run: |
echo "LEGEND_METADATA=$GITHUB_WORKSPACE/inputs" >> $GITHUB_ENV

- name: Clone legend-metadata
uses: actions/checkout@v4
with:
Expand All @@ -68,6 +69,10 @@ jobs:
token: ${{ secrets.CLONE_LEGEND_METADATA }}
path: ${{ env.LEGEND_METADATA }}

- name: Recursively update legend-metadata submodules
run: |
cd "$LEGEND_METADATA" && git submodule update --recursive --remote

- name: Run data production tests
run: ./tests/runprod/run-all.sh

Expand Down
6 changes: 4 additions & 2 deletions dataflow-config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
legend_metadata_version: main
# legend_metadata_version: main
allow_none_par: false

paths:
sandbox_path: $_/sandbox
Expand Down Expand Up @@ -74,8 +75,8 @@ execenv:
arg: /data2/public/prodenv/containers/legendexp_legend-base_latest_20241110203225.sif
env:
PRODENV: $PRODENV
NUMBA_CACHE_DIR: $_/.snakemake/numba-cache
LGDO_BOUNDSCHECK: "false"
# LGDO_CACHE: "false"
DSPEED_BOUNDSCHECK: "false"
PYGAMA_PARALLEL: "false"
PYGAMA_FASTMATH: "false"
Expand All @@ -86,6 +87,7 @@ execenv:
arg: --image legendexp/legend-base:latest
env:
PRODENV: $PRODENV
NUMBA_CACHE_DIR: $_/.snakemake/numba-cache
HDF5_USE_FILE_LOCKING: "false"
LGDO_BOUNDSCHECK: "false"
DSPEED_BOUNDSCHECK: "false"
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ dynamic = ["version"]
dependencies = [
"colorlog",
"dbetto>=1.2",
"pygama>=2",
"pygama>=2.0.5",
"dspeed>=1.6",
"pylegendmeta>=1.2",
"legend-pydataobj>=1.11.6",
Expand Down Expand Up @@ -116,6 +116,7 @@ par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_
par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal"
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_dsp_trg_thr"

[tool.uv.workspace]
exclude = ["generated", "inputs", "software", "workflow"]
Expand Down
35 changes: 35 additions & 0 deletions tests/runprod/test-argon-char-dataprod.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

# IMPORTANT: this script must be executed from the legend-dataflow directory

# shellcheck disable=SC1091
source "$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &> /dev/null && pwd)/conftest.sh"

rawdir="$(get_dataflow_config_value paths.tier_raw)"
mkdir -p "${rawdir}" || exit 1

function mkdir_n_touch() {
mkdir -p "$(dirname "${1}")" || return 1
touch "${1}" || return 1
}

rawfiles=(
anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5
anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5
acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5
)

(
cd "${rawdir}" || exit 1
for file in "${rawfiles[@]}"; do
mkdir_n_touch "$file"
done
)

_smk_opts=(
--touch
--config allow_none_par=true
--workflow-profile workflow/profiles/default
)

run_test_command snakemake "${_smk_opts[@]}" "all-p13-*-evt.gen" || exit 1
6 changes: 2 additions & 4 deletions tests/runprod/test-evt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ rawfiles=(
cal/p03/r001/l200-p03-r001-cal-20230317T211819Z-tier_raw.lh5
cal/p03/r000/l200-p03-r000-cal-20230311T235840Z-tier_raw.lh5
cal/p03/r002/l200-p03-r002-cal-20230324T161401Z-tier_raw.lh5
anp/p13/r002/l200-p13-r002-anp-20241217T094846Z-tier_raw.lh5
anc/p13/r006/l200-p13-r006-anc-20241221T150249Z-tier_raw.lh5
acs/p13/r006/l200-p13-r006-acs-20241221T150307Z-tier_raw.lh5
)

(
Expand All @@ -47,4 +44,5 @@ _smk_opts=(
--workflow-profile workflow/profiles/default
)

run_test_command snakemake "${_smk_opts[@]}" "all-*-evt.gen" || exit 1
run_test_command snakemake "${_smk_opts[@]}" "all-p03-*-evt.gen" || exit 1
run_test_command snakemake "${_smk_opts[@]}" "all-p04-*-evt.gen" || exit 1
20 changes: 13 additions & 7 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ basedir = workflow.basedir

time = datetime.now().strftime("%Y%m%dT%H%M%SZ")

if not Path(meta).exists():
LegendMetadata(meta).checkout(config.legend_metadata_version)

metadata = LegendMetadata(meta, lazy=True)
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
metadata = LegendMetadata(meta)
if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)

part = CalGrouping(config, Path(det_status) / "cal_groupings.yaml")

Expand All @@ -51,12 +51,14 @@ wildcard_constraints:
timestamp=r"\d{8}T\d{6}Z",


include: "rules/channel_merge.smk"
include: "rules/filelist_gen.smk"
include: "rules/chanlist_gen.smk"
include: "rules/common.smk"
include: "rules/main.smk"
include: "rules/tcm.smk"
include: "rules/dsp_pars_geds.smk"
include: "rules/dsp_pars_spms.smk"
include: "rules/dsp.smk"
include: "rules/psp_pars_geds.smk"
include: "rules/psp.smk"
Expand All @@ -79,12 +81,16 @@ localrules:

onstart:
print("INFO: starting workflow")

# Make sure some packages are initialized before we begin to avoid race conditions
# https://numba.readthedocs.io/en/stable/developer/caching.html#cache-sharing
if not workflow.touch:
for pkg in ["dspeed", "lgdo", "matplotlib"]:
shell(execenv.execenv_pyexe(config, "python") + "-c 'import " + pkg + "'")
shell(
execenv.execenv_pyexe(config, "python")
+ "-c 'import dspeed, lgdo, matplotlib, pygama'"
)

# Log parameter catalogs in validity files
# Log parameter catalogs in validity files
hit_par_cat_file = Path(utils.pars_path(config)) / "hit" / "validity.yaml"
if hit_par_cat_file.is_file():
hit_par_cat_file.unlink()
Expand Down
7 changes: 5 additions & 2 deletions workflow/Snakefile-build-raw
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from legenddataflow import patterns as patt
from legenddataflow import utils, execenv, ParsKeyResolve
from datetime import datetime
from dbetto import AttrsDict
from legendmeta import LegendMetadata

utils.subst_vars_in_snakemake_config(workflow, config)
config = AttrsDict(config)
Expand All @@ -26,8 +27,10 @@ meta = utils.metadata_path(config)

time = datetime.now().strftime("%Y%m%dT%H%M%SZ")

if not Path(meta_path).exists():
LegendMetadata(meta_path).checkout(config.legend_metadata_version)
# NOTE: this will attempt a clone of legend-metadata, if the directory does not exist
metadata = LegendMetadata(meta_path, lazy=True)
if "legend_metadata_version" in config:
metadata.checkout(config.legend_metadata_version)


wildcard_constraints:
Expand Down
28 changes: 22 additions & 6 deletions workflow/rules/chanlist_gen.smk
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ from legenddataflow import execenv_pyexe
from legenddataflow.utils import filelist_path


def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):
# FIXME: the system argument should always be explicitly supplied
def get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system="geds"
):
key = ChannelProcKey.parse_keypart(keypart)

flist_path = filelist_path(setup)
os.makedirs(flist_path, exist_ok=True)
output_file = os.path.join(
flist_path,
f"all-{key.experiment}-{key.period}-{key.run}-cal-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
)

os.system(
execenv_pyexe(config, "create-chankeylist")
+ f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} "
f"--datatype cal --output-file {output_file}"
f"--datatype {key.datatype} --output-file {output_file} --system {system}"
)

with open(output_file) as r:
Expand All @@ -36,12 +39,25 @@ def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):


def get_par_chanlist(
setup, keypart, tier, basedir, det_status, chan_maps, name=None, extension="yaml"
setup,
keypart,
tier,
basedir,
det_status,
chan_maps,
datatype="cal",
system="geds",
name=None,
extension="yaml",
):

chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps)
chan_list = get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system
)

par_pattern = get_pattern_pars_tmp_channel(setup, tier, name, extension)
par_pattern = get_pattern_pars_tmp_channel(
setup, tier, name, datatype=datatype, extension=extension
)

filenames = ChannelProcKey.get_channel_files(keypart, par_pattern, chan_list)

Expand Down
60 changes: 42 additions & 18 deletions workflow/rules/channel_merge.smk
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from legenddataflow.patterns import (
get_pattern_pars_tmp_channel,
get_pattern_plts_tmp_channel,
get_pattern_plts,
get_pattern_tier,
get_pattern_pars_tmp,
get_pattern_pars,
)
from legenddataflow.utils import set_last_rule_name
import inspect

from legenddataflow import patterns
from legenddataflow.utils import set_last_rule_name
from legenddataflow.execenv import execenv_pyexe


def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
if lh5_tier is None:
lh5_tier = tier
Expand All @@ -24,7 +19,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
),
output:
get_pattern_plts(config, tier),
patterns.get_pattern_plts(config, tier),
group:
f"merge-{tier}"
shell:
Expand All @@ -47,7 +42,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
extension="pkl",
),
output:
get_pattern_pars(
patterns.get_pattern_pars(
config,
tier,
name="objects",
Expand Down Expand Up @@ -76,7 +71,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
),
output:
temp(
get_pattern_pars_tmp(
patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
Expand All @@ -91,6 +86,35 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):

set_last_rule_name(workflow, f"build_pars_{tier}_db")

rule:
"""Merge pars for SiPM channels in a single pars file."""
input:
lambda wildcards: get_par_chanlist(
config,
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
tier,
basedir,
det_status,
chan_maps,
datatype=wildcards.datatype,
system="spms"
),
output:
patterns.get_pattern_pars(
config,
tier,
name="spms",
datatype="{datatype}",
),
group:
f"merge-{tier}"
shell:
execenv_pyexe(config, "merge-channels") + \
"--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_pars_spms_{tier}_db")

rule:
input:
in_files=lambda wildcards: get_par_chanlist(
Expand All @@ -102,27 +126,27 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
),
in_db=get_pattern_pars_tmp(
in_db=patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
) if lh5_merge is True else [],
plts=get_pattern_plts(config, tier),
objects=get_pattern_pars(
plts=patterns.get_pattern_plts(config, tier),
objects=patterns.get_pattern_pars(
config,
tier,
name="objects",
extension="dir",
check_in_cycle=check_in_cycle,
),
output:
out_file=get_pattern_pars(
out_file=patterns.get_pattern_pars(
config,
tier,
extension="lh5" if lh5_merge is True else inspect.signature(get_pattern_pars).parameters['extension'].default,
extension="lh5" if lh5_merge is True else inspect.signature(patterns.get_pattern_pars).parameters['extension'].default,
check_in_cycle=check_in_cycle,
),
out_db=get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
out_db=patterns.get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
group:
f"merge-{tier}"
run:
Expand Down
Loading
Loading