Skip to content


analysis_toolkit;Added pre- selection function as a start
Browse files Browse the repository at this point in the history
  • Loading branch information
anupama-reghunath committed Nov 28, 2024
1 parent 0a4b703 commit 6d9d3ed
Showing 1 changed file with 241 additions and 0 deletions.
241 changes: 241 additions & 0 deletions python/
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
#!/usr/bin/env python
"""Toolkit for Analysis."""

import numpy as np
import ROOT
import shipunit as u
import yaml
from rootpyPickler import Unpickler
from ShipGeoConfig import AttrDict

class selection_check:
"""Class to perform various selection checks on the candidate."""

def __init__(self, g):
"""Initialize the selection_check class with geometry and configuration."""
geo_file = ROOT.TFile.Open(g, "read")
self.geometry_manager = geo_file.FAIRGeom
unpickler = Unpickler(geo_file)
self.ship_geo = unpickler.load("ShipGeo")

fairship = ROOT.gSystem.Getenv("FAIRSHIP")

if self.ship_geo.DecayVolumeMedium == "helium":
with open(fairship + "/geometry/veto_config_helium.yaml", "r") as file:
config = yaml.safe_load(file)
self.veto_geo = AttrDict(config)
if self.ship_geo.DecayVolumeMedium == "vacuums":
with open(fairship + "/geometry/veto_config_vacuums.yaml", "r") as file:
config = yaml.safe_load(file)
self.veto_geo = AttrDict(config)

def access_event(self, tree):
"""Access event data."""
self.tree = tree

def define_candidate_time(self, candidate):
"""Calculate time associated with the candidate decay vertex using strawtubes MCPoint info."""
t0 = self.tree.ShipEventHeader.GetEventTime()
candidate_pos = ROOT.TVector3()

d1, d2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
d1_mc, d2_mc = self.tree.fitTrack2MC[d1], self.tree.fitTrack2MC[d2]

time_vtx_from_strawhits = []

for hit in self.tree.strawtubesPoint:
if not (
int(str(hit.GetDetectorID())[:1]) == 1
or int(str(hit.GetDetectorID())[:1]) == 2

if not (hit.GetTrackID() == d1_mc or hit.GetTrackID() == d2_mc):

t_straw = hit.GetTime()

dist = np.sqrt(
(candidate_pos.X() - hit.GetX()) ** 2
+ (candidate_pos.Y() - hit.GetY()) ** 2
+ (candidate_pos.Z() - hit.GetZ()) ** 2
) # distance to the vertex #in cm

d_mom = self.tree.MCTrack[hit.GetTrackID()].GetP() / u.GeV
mass = self.tree.MCTrack[hit.GetTrackID()].GetMass()
v = u.c_light * d_mom / np.sqrt(d_mom**2 + (mass) ** 2)

t_vertex = t_straw - (dist / v)


t_vtx = np.average(time_vtx_from_strawhits) + t0

return t_vtx # units in ns

def impact_parameter(self, candidate):
"""Calculate the impact parameter of the candidate relative to (0,0,target.z0)."""
candidate_pos = ROOT.TVector3()

candidate_mom = ROOT.TLorentzVector()
target_point = ROOT.TVector3(0, 0,

projection_factor = 0
if hasattr(candidate_mom, "P"):
P = candidate_mom.P()
P = candidate_mom.Mag()
for i in range(3):
projection_factor += (
candidate_mom(i) / P * (target_point(i) - candidate_pos(i))

dist = 0
for i in range(3):
dist += (
- candidate_pos(i)
- projection_factor * candidate_mom(i) / P
) ** 2
dist = ROOT.TMath.Sqrt(dist)

return dist # in cm

def dist_to_innerwall(self, candidate):
"""Calculate the minimum distance(in XY plane) of the candidate decay vertex to the inner wall of the decay vessel. If outside the decay volume, or if distance > 100cm,Return 0."""
candidate_pos = ROOT.TVector3()
position = (candidate_pos.X(), candidate_pos.Y(), candidate_pos.Z())

nsteps = 8
dalpha = 2 * ROOT.TMath.Pi() / nsteps
min_distance = float("inf")

node = self.geometry_manager.FindNode(*position)
if not node:
return 0 # is outside the decay volume

# Loop over directions in the XY plane
for n in range(nsteps):
alpha = n * dalpha
direction = (
) # Direction vector in XY plane
self.geometry_manager.InitTrack(*position, *direction)
if not self.geometry_manager.FindNextBoundary():
# Get the distance to the boundary and update the minimum distance
distance = self.geometry_manager.GetStep()
min_distance = min(min_distance, distance)

return min_distance if min_distance < 100 * u.m else 0

def dist_to_entrance_vessel(self, candidate):
"""Calculate the distance of the candidate decay vertex to the entrance of the decay vessel."""
candidate_pos = ROOT.TVector3()
return candidate_pos.Z() - self.veto_geo.z0

def nDOF(self, candidate):
"""Return the number of degrees of freedom (nDOF) for the particle's daughter tracks."""
flag = True
nmeas = []
t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
ndf_cut = 25 # cut on nDOF
for tr in [t1, t2]:
fit_status = self.tree.FitTracks[tr].getFitStatus()
) # nmeas.append(fit_status.getNdf())
if nmeas[-1] <= ndf_cut:
flag = False # dof<=25=False
return nmeas, flag

def daughtermomentum(self, candidate):
"""Return the momentum(Mag) of the particle's daughter tracks."""
flag = True
daughter_mom = []
t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
for trD in [t1, t2]:
x = self.tree.FitTracks[trD]
xx = x.getFittedState()
if daughter_mom[-1] < 1 * u.GeV:
flag = False
return daughter_mom, flag

def invariant_mass(self, candidate):
"""Invariant mass of the candidate."""
return candidate.GetMass()

def DOCA(self, candidate):
"""Distance of Closest Approach."""
return candidate.GetDoca()

def is_in_fiducial(self, candidate):
"""Check if the candidate decay vertex is within the Fiducial Volume."""
candidate_pos = ROOT.TVector3()

if candidate_pos.Z() > self.ship_geo.TrackStation1.z:
return False
if candidate_pos.Z() < self.veto_geo.z0:
return False

# if self.dist2InnerWall(candidate)<=5* return False

vertex_node = ROOT.gGeoManager.FindNode(
candidate_pos.X(), candidate_pos.Y(), candidate_pos.Z()
vertex_elem = vertex_node.GetVolume().GetName()
if not vertex_elem.startswith("DecayVacuum_"):
return False
return True

def chi2nDOF(self, candidate):
"""Return the reduced chi^2 of the particle's daughter tracks."""
t1, t2 = candidate.GetDaughter(0), candidate.GetDaughter(1)
# cut on chi2nDOF
flag = True
chi2ndf_cut = 5
chi2ndf = []
for tr in [t1, t2]:
fit_status = self.tree.FitTracks[tr].getFitStatus()
chi2ndf.append(fit_status.getChi2() / fit_status.getNdf())
if chi2ndf[-1] >= chi2ndf_cut:
flag = False
return chi2ndf, flag

def preselection_cut(self, candidate, IP_cut=250):
"""Umbrella method to apply the pre-selection cuts on the candidate."""
if len(self.tree.Particles) != 1:
return False
if self.dist_to_innerwall(candidate) <= 5 *
return False
if not (self.is_in_fiducial(candidate)):
return False
if self.dist_to_entrance_vessel(candidate) < 100 *
return False
if self.impact_parameter(candidate) > IP_cut:
return False
nmeas, flag = self.nDOF(candidate)
if not (flag):
return False
chi2ndf, flag = self.chi2nDOF(candidate)
if not (flag):
return False
d_mom, flag = self.daughtermomentum(candidate)
if not (flag):
return False
if self.DOCA(candidate) >= 1 *
return False

return True

0 comments on commit 6d9d3ed

Please sign in to comment.