Skip to content
Snippets Groups Projects
Commit df4a9864 authored by Duncan Meacher's avatar Duncan Meacher
Browse files

gstlal_etg: Added trigger start and stop times. Bug in trigger times from trigsegsdict

parent 3ecef835
No related branches found
No related tags found
No related merge requests found
......@@ -44,7 +44,6 @@ GObject.threads_init()
Gst.init(None)
import lal
from confluent_kafka import Producer
import numpy
import pandas
......@@ -325,8 +324,9 @@ class MultiChannelHandler(simplehandler.Handler):
# declaration then terrible terrible things
# will happen
if mapinfo.data:
for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data):
self.process_row(channel, rate, buftime, row)
if buftime >= int(options.trigger_start_time) and buftime < int(options.trigger_end_time):
for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data):
self.process_row(channel, rate, buftime, row)
memory.unmap(mapinfo)
del buf
......@@ -442,19 +442,20 @@ class MultiChannelHandler(simplehandler.Handler):
# NOTE
# This method should only be called by an instance that is locked.
# Use T050017 filenaming convention.
fname = '%s-%d-%d.%s' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence, "trg")
path = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
fpath = os.path.join(path, fname)
tmpfile = fpath+"~"
try:
os.makedirs(path)
except OSError:
pass
with open(tmpfile, 'w') as f:
f.write(''.join(self.fdata))
shutil.move(tmpfile, fpath)
latency = numpy.round(int(aggregator.now()) - buftime)
print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency)
if len(self.fdata) > 1 :
fname = '%s-%d-%d.%s' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence, "trg")
path = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5])
fpath = os.path.join(path, fname)
tmpfile = fpath+"~"
try:
os.makedirs(path)
except OSError:
pass
with open(tmpfile, 'w') as f:
f.write(''.join(self.fdata))
shutil.move(tmpfile, fpath)
latency = numpy.round(int(aggregator.now()) - buftime)
print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency)
def gen_psd_xmldoc(self):
xmldoc = lal.series.make_psd_xmldoc(self.psds)
......@@ -608,12 +609,22 @@ def parse_command_line():
parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
parser.add_option("--trigger-start-time", metavar = "seconds", help = "Set the start time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm")
parser.add_option("--trigger-end-time", metavar = "seconds", help = "Set the end time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm")
#
# parse the arguments and sanity check
#
options, filenames = parser.parse_args()
# Sanity check the options
if options.trigger_start_time is None :
options.trigger_start_time = options.gps_start_time
if options.trigger_end_time is None :
options.trigger_end_time = options.gps_end_time
return options, filenames
......@@ -636,6 +647,9 @@ channels = data_source_info.channel_dict.keys()
# dictionary of basis parameters keyed by ifo, rate
basis_params = {}
if options.use_kafka:
from confluent_kafka import Producer
if not options.disable_web_service:
#
# create a new, empty, Bottle application and make it the current
......
......@@ -56,7 +56,6 @@ lsctables.use_in(LIGOLWContentHandler)
# get a dictionary of all the segments
#
#def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # FIXME Set proper
segsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
......@@ -64,7 +63,6 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): #
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(10 * start_pad)
#segsdict[ifo] &= segments.segmentlist([boundary_seg])
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
......@@ -73,16 +71,29 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): #
return segsdict
def trigger_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # FIXME Set proper
trigsegsdict = segments.segmentlistdict()
# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
start_pad = 512 + max_template_length # FIXME set start_pad to be imported value
# Chosen so that the overlap is only a ~5% hit in run time for long segments...
segment_length = int(10 * start_pad)
trigsegsdict[ifo] = segments.segmentlist([boundary_seg])
trigsegsdict[ifo] = gstlaldagparts.breakupsegs(trigsegsdict[ifo], segment_length, start_pad)
if not trigsegsdict[ifo]:
del trigsegsdict[ifo]
return trigsegsdict
#
# get a dictionary of all the channels per gstlal_etg job
#
def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channels, data_source_info):
def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, trigsegsdict, ifo, options, channels, data_source_info):
etg_nodes = {}
cumsum_rates = 0
total_rates = 0
outstr = ""
out_index = 0
n_channels = 0
n_cpu = 0
......@@ -104,9 +115,10 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
print "Total jobs needed =", n_cpu
print "Evenly distributed streams per job =", int(n_streams)
for seg in segsdict[ifo]:
for seg, trigseg in zip(segsdict[ifo], trigsegsdict[ifo]):
cumsum_rates = 0
out_index = 0
for ii, channel in enumerate(channels,1):
n_channels += 1
......@@ -129,6 +141,8 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trigseg[0]),
"trigger-end-time":int(trigseg[1]),
"data-source":"frames",
"channel-name":outstr,
"mismatch":options.mismatch,
......@@ -219,12 +233,13 @@ dag = inspiral_pipe.DAG("etg_trigger_pipe")
gstlalETGJob = inspiral_pipe.generic_job("gstlal_etg", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length)
trigsegsdict = trigger_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length)
#
# ETG jobs
#
etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, channels, data_source_info)
etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, trigsegsdict, ifo, options, channels, data_source_info)
#
# all done
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment