Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rule to build dsp pars for SiPM data #98

Merged
merged 23 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ par-geds-psp-average = "legenddataflow.scripts.par.geds.psp.average:par_geds_
par-geds-raw-blindcal = "legenddataflow.scripts.par.geds.raw.blindcal:par_geds_raw_blindcal"
par-geds-raw-blindcheck = "legenddataflow.scripts.par.geds.raw.blindcheck:par_geds_raw_blindcheck"
par-geds-tcm-pulser = "legenddataflow.scripts.par.geds.tcm.pulser:par_geds_tcm_pulser"
par-spms-dsp-trg-thr = "legenddataflow.scripts.par.spms.dsp.trigger_threshold:par_spms_trg_thr"

[tool.uv.workspace]
exclude = ["generated", "inputs", "software", "workflow"]
Expand Down
2 changes: 2 additions & 0 deletions workflow/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ wildcard_constraints:
timestamp=r"\d{8}T\d{6}Z",


include: "rules/channel_merge.smk"
include: "rules/filelist_gen.smk"
include: "rules/chanlist_gen.smk"
include: "rules/common.smk"
include: "rules/main.smk"
include: "rules/tcm.smk"
include: "rules/dsp_pars_geds.smk"
include: "rules/dsp_pars_spms.smk"
include: "rules/dsp.smk"
include: "rules/psp_pars_geds.smk"
include: "rules/psp.smk"
Expand Down
28 changes: 22 additions & 6 deletions workflow/rules/chanlist_gen.smk
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ from legenddataflow import execenv_pyexe
from legenddataflow.utils import filelist_path


def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):
# FIXME: the system argument should always be explicitly supplied
def get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system="geds"
):
key = ChannelProcKey.parse_keypart(keypart)

flist_path = filelist_path(setup)
os.makedirs(flist_path, exist_ok=True)
output_file = os.path.join(
flist_path,
f"all-{key.experiment}-{key.period}-{key.run}-cal-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
f"all-{key.experiment}-{key.period}-{key.run}-{key.datatype}-{key.timestamp}-channels.chankeylist.{random.randint(0,99999):05d}",
)

os.system(
execenv_pyexe(config, "create-chankeylist")
+ f"--det-status {det_status} --channelmap {chan_maps} --timestamp {key.timestamp} "
f"--datatype cal --output-file {output_file}"
f"--datatype {key.datatype} --output-file {output_file} --system {system}"
)

with open(output_file) as r:
Expand All @@ -36,12 +39,25 @@ def get_chanlist(setup, keypart, workflow, config, det_status, chan_maps):


def get_par_chanlist(
setup, keypart, tier, basedir, det_status, chan_maps, name=None, extension="yaml"
setup,
keypart,
tier,
basedir,
det_status,
chan_maps,
datatype="cal",
system="geds",
name=None,
extension="yaml",
):

chan_list = get_chanlist(setup, keypart, workflow, config, det_status, chan_maps)
chan_list = get_chanlist(
setup, keypart, workflow, config, det_status, chan_maps, system
)

par_pattern = get_pattern_pars_tmp_channel(setup, tier, name, extension)
par_pattern = get_pattern_pars_tmp_channel(
setup, tier, name, datatype=datatype, extension=extension
)

filenames = ChannelProcKey.get_channel_files(keypart, par_pattern, chan_list)

Expand Down
59 changes: 41 additions & 18 deletions workflow/rules/channel_merge.smk
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
from legenddataflow.patterns import (
get_pattern_pars_tmp_channel,
get_pattern_plts_tmp_channel,
get_pattern_plts,
get_pattern_tier,
get_pattern_pars_tmp,
get_pattern_pars,
)
from legenddataflow.utils import set_last_rule_name
import inspect

from legenddataflow import patterns
from legenddataflow.utils import set_last_rule_name
from legenddataflow.execenv import execenv_pyexe


def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
if lh5_tier is None:
lh5_tier = tier
Expand All @@ -24,7 +19,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
),
output:
get_pattern_plts(config, tier),
patterns.get_pattern_plts(config, tier),
group:
f"merge-{tier}"
shell:
Expand All @@ -47,7 +42,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
extension="pkl",
),
output:
get_pattern_pars(
patterns.get_pattern_pars(
config,
tier,
name="objects",
Expand Down Expand Up @@ -76,7 +71,7 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
),
output:
temp(
get_pattern_pars_tmp(
patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
Expand All @@ -91,6 +86,34 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):

set_last_rule_name(workflow, f"build_pars_{tier}_db")

rule:
"""Merge pars for SiPM channels in a single pars file."""
input:
lambda wildcards: get_par_chanlist(
config,
f"all-{wildcards.experiment}-{wildcards.period}-{wildcards.run}-{wildcards.datatype}-{wildcards.timestamp}-channels",
tier,
basedir,
det_status,
chan_maps,
datatype=wildcards.datatype,
system="spms"
),
output:
patterns.get_pattern_pars(
config,
tier,
datatype="{datatype}",
),
group:
f"merge-{tier}"
shell:
execenv_pyexe(config, "merge-channels") + \
"--input {input} "
"--output {output} "

set_last_rule_name(workflow, f"build_pars_spms_{tier}_db")

rule:
input:
in_files=lambda wildcards: get_par_chanlist(
Expand All @@ -102,27 +125,27 @@ def build_merge_rules(tier, lh5_merge=False, lh5_tier=None):
chan_maps,
extension="lh5" if lh5_merge is True else inspect.signature(get_par_chanlist).parameters['extension'].default,
),
in_db=get_pattern_pars_tmp(
in_db=patterns.get_pattern_pars_tmp(
config,
tier,
datatype="cal",
) if lh5_merge is True else [],
plts=get_pattern_plts(config, tier),
objects=get_pattern_pars(
plts=patterns.get_pattern_plts(config, tier),
objects=patterns.get_pattern_pars(
config,
tier,
name="objects",
extension="dir",
check_in_cycle=check_in_cycle,
),
output:
out_file=get_pattern_pars(
out_file=patterns.get_pattern_pars(
config,
tier,
extension="lh5" if lh5_merge is True else inspect.signature(get_pattern_pars).parameters['extension'].default,
extension="lh5" if lh5_merge is True else inspect.signature(patterns.get_pattern_pars).parameters['extension'].default,
check_in_cycle=check_in_cycle,
),
out_db=get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
out_db=patterns.get_pattern_pars(config, tier, check_in_cycle=check_in_cycle) if lh5_merge is True else [],
group:
f"merge-{tier}"
run:
Expand Down
22 changes: 7 additions & 15 deletions workflow/rules/dsp.smk
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,7 @@ Snakemake rules for processing dsp tier.
from legenddataflow.pars_loading import ParsCatalog
from legenddataflow.create_pars_keylist import ParsKeyResolve
from pathlib import Path
from legenddataflow.patterns import (
get_pattern_plts,
get_pattern_tier,
get_pattern_pars_tmp,
get_pattern_log,
get_pattern_pars,
)
from legenddataflow import patterns as patt
from legenddataflow.execenv import execenv_pyexe

dsp_par_catalog = ParsKeyResolve.get_par_catalog(
Expand All @@ -22,30 +16,27 @@ dsp_par_catalog = ParsKeyResolve.get_par_catalog(
{"cal": ["par_dsp"], "lar": ["par_dsp"]},
)


include: "channel_merge.smk"


build_merge_rules("dsp", lh5_merge=True)


rule build_dsp:
input:
raw_file=get_pattern_tier(config, "raw", check_in_cycle=False),
raw_file=patt.get_pattern_tier(config, "raw", check_in_cycle=False),
pars_file=ancient(
lambda wildcards: dsp_par_catalog.get_par_file(
config, wildcards.timestamp, "dsp"
)
),
pars_file_spms=patt.get_pattern_pars(config, "dsp", datatype="{datatype}"),
params:
timestamp="{timestamp}",
datatype="{datatype}",
ro_input=lambda _, input: {k: ro(v) for k, v in input.items()},
output:
tier_file=get_pattern_tier(config, "dsp", check_in_cycle=check_in_cycle),
db_file=get_pattern_pars_tmp(config, "dsp_db"),
tier_file=patt.get_pattern_tier(config, "dsp", check_in_cycle=check_in_cycle),
db_file=patt.get_pattern_pars_tmp(config, "dsp_db"),
log:
get_pattern_log(config, "tier_dsp", time),
patt.get_pattern_log(config, "tier_dsp", time),
group:
"tier-dsp"
resources:
Expand All @@ -62,3 +53,4 @@ rule build_dsp:
"--output {output.tier_file} "
"--db-file {output.db_file} "
"--pars-file {params.ro_input[pars_file]} "
"--pars-file_spms {params.ro_input[pars_file_spms]} "
49 changes: 49 additions & 0 deletions workflow/rules/dsp_pars_spms.smk
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""DSP parameter generation for SiPM data"""

from pathlib import Path

from legenddataflow import patterns as patt
from legenddataflow import utils, execenv_pyexe


rule build_pars_dsp_tau_spms:
input:
filelist=Path(utils.filelist_path(config))
/ "all-{experiment}-{period}-{run}-{datatype}-raw.filelist",
pardb=lambda wildcards: get_overwrite_file("dsp", wildcards),
params:
timestamp="{timestamp}",
datatype="{datatype}",
channel="{channel}",
raw_table_name=lambda wildcards: get_table_name(
metadata,
config,
wildcards.datatype,
wildcards.timestamp,
wildcards.channel,
"raw",
),
wildcard_constraints:
datatype=r"\b(?!cal\b|xtc\b)\w+\b",
output:
temp(
patt.get_pattern_pars_tmp_channel(
config, "dsp", "spms_trigger_threshold", datatype="{datatype}"
)
),
log:
patt.get_pattern_log_channel(
config, "spms_trigger_threshold", time, datatype="{datatype}"
),
group:
"par-dsp"
shell:
execenv_pyexe(config, "par-spms-dsp-trg-thr") + "--config-path {configs} "
"--raw-files {input.filelist} "
"--dsp-db {input.pardb} "
"--datatype {params.datatype} "
"--timestamp {params.timestamp} "
"--sipm-name {params.channel} "
"--raw-table-name {params.raw_table_name} "
"--output-file {output} "
"--logfile {log} "
12 changes: 12 additions & 0 deletions workflow/src/legenddataflow/cfgtools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Mapping


def get_channel_config(
mapping: Mapping, channel: str, default_key: str = "__default__"
):
"""Get channel key from mapping with default.

Returns the value at key `channel`, if existing, otherwise return value at
`default_key`.
"""
return mapping.get(channel, mapping[default_key])
1 change: 0 additions & 1 deletion workflow/src/legenddataflow/execenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ def _runcmd(cmd_expr, cmd_env, **kwargs):
"install",
"--prefix",
path_install,
"--",
str(config_loc),
]
if args.editable:
Expand Down
Loading
Loading