-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFilterFrame.py
More file actions
119 lines (92 loc) · 4.29 KB
/
FilterFrame.py
File metadata and controls
119 lines (92 loc) · 4.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from icecube import icetray, dataclasses, simclasses
class FilterFrame(icetray.I3Module):
def __init__(self, ctx):
super(FilterFrame, self).__init__(ctx)
self.AddParameter("AllowedStrings", "List of allowed string IDs", [])
def Configure(self):
self.photon_map_key = "I3Photons"
self.allowed_strings = set(self.GetParameter("AllowedStrings")) # Convert to set for fast lookup
def replaceOMGeo(self, frame):
omgeo_map = frame["I3OMGeoMap"]
filtered_omgeo_map = dataclasses.I3OMGeoMap()
for omkey, omgeo in omgeo_map.items():
if omkey.string in self.allowed_strings:
filtered_omgeo_map[omkey] = omgeo
frame.Replace("I3OMGeoMap",filtered_omgeo_map)
newGeo = dataclasses.I3Geometry()
newGeo.start_time = frame["I3Geometry"].start_time
newGeo.end_time = frame["I3Geometry"].end_time
newGeo.omgeo = filtered_omgeo_map
frame.Replace("I3Geometry",newGeo)
return frame
def replaceModGeo(self, frame):
modgeo_map = frame["I3ModuleGeoMap"]
filtered_modgeo_map = dataclasses.I3ModuleGeoMap()
for modkey, modgeo in modgeo_map.items():
if modkey.string in self.allowed_strings:
filtered_modgeo_map[modkey] = modgeo
frame.Replace("I3ModuleGeoMap",filtered_modgeo_map)
return frame
def replaceSubdet(self,frame):
subdet = frame["Subdetectors"]
filtered_det = dataclasses.I3MapModuleKeyString()
for modkey, string in subdet.items():
if modkey.string in self.allowed_strings:
filtered_det[modkey] = string
frame.Replace("Subdetectors",filtered_det)
return frame
def replaceCal(self,frame):
ical = frame["I3Calibration"]
filtered_domcal = dataclasses.Map_OMKey_I3DOMCalibration()
for modkey, cal in ical.dom_cal.items():
if modkey.string in self.allowed_strings:
filtered_domcal[modkey] = cal
newcal = dataclasses.I3Calibration()
newcal.dom_cal = filtered_domcal
newcal.start_time = ical.start_time
newcal.end_time = ical.end_time
newcal.vem_cal = ical.vem_cal
frame.Replace("I3Calibration",newcal)
return frame
def replaceStatus(self,frame):
idet = frame["I3DetectorStatus"]
filtered_domstat = dataclasses.Map_OMKey_I3DOMStatus()
for modkey, stat in idet.dom_status.items():
if modkey.string in self.allowed_strings:
filtered_domstat[modkey] = stat
newdet = dataclasses.I3DetectorStatus()
newdet.dom_status = filtered_domstat
newdet.start_time = idet.start_time
newdet.end_time = idet.end_time
newdet.daq_configuration_name = idet.daq_configuration_name
newdet.trigger_status = idet.trigger_status
frame.Replace("I3DetectorStatus",newdet)
return frame
def Geometry(self, frame):
frame = self.replaceOMGeo(frame)
frame = self.replaceModGeo(frame)
frame = self.replaceSubdet(frame)
self.PushFrame(frame)
def Calibration(self, frame):
frame = self.replaceOMGeo(frame)
frame = self.replaceModGeo(frame)
frame = self.replaceSubdet(frame)
frame = self.replaceCal(frame)
self.PushFrame(frame)
def DetectorStatus(self, frame):
frame = self.replaceOMGeo(frame)
frame = self.replaceModGeo(frame)
frame = self.replaceSubdet(frame)
frame = self.replaceCal(frame)
frame = self.replaceStatus(frame)
self.PushFrame(frame)
def DAQ(self,frame):
if self.photon_map_key in frame:
photon_map = frame[self.photon_map_key] #get photon list
filtered_map = simclasses.I3CompressedPhotonSeriesMap() #make empty list
for module_key, photon_vector in photon_map.items():
if module_key.string in self.allowed_strings:
filtered_map[module_key] = photon_vector
if(filtered_map): #only write frames with photons
frame.Replace(self.photon_map_key,filtered_map)
self.PushFrame(frame)