diff --git a/gstlal-ugly/bin/gstlal_etg b/gstlal-ugly/bin/gstlal_etg index 6ad3fbd1e98df1647793feee48e0be016bfa77ed..9fdecc2cf9d649f7cc3f97ffac2b41527b63c085 100755 --- a/gstlal-ugly/bin/gstlal_etg +++ b/gstlal-ugly/bin/gstlal_etg @@ -44,7 +44,6 @@ GObject.threads_init() Gst.init(None) import lal -from confluent_kafka import Producer import numpy import pandas @@ -325,8 +324,9 @@ class MultiChannelHandler(simplehandler.Handler): # declaration then terrible terrible things # will happen if mapinfo.data: - for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data): - self.process_row(channel, rate, buftime, row) + if buftime >= int(options.trigger_start_time) and buftime < int(options.trigger_end_time): + for row in sngltriggertable.GSTLALSnglTrigger.from_buffer(mapinfo.data): + self.process_row(channel, rate, buftime, row) memory.unmap(mapinfo) del buf @@ -442,19 +442,20 @@ class MultiChannelHandler(simplehandler.Handler): # NOTE # This method should only be called by an instance that is locked. # Use T050017 filenaming convention. - fname = '%s-%d-%d.%s' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence, "trg") - path = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5]) - fpath = os.path.join(path, fname) - tmpfile = fpath+"~" - try: - os.makedirs(path) - except OSError: - pass - with open(tmpfile, 'w') as f: - f.write(''.join(self.fdata)) - shutil.move(tmpfile, fpath) - latency = numpy.round(int(aggregator.now()) - buftime) - print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency) + if len(self.fdata) > 1 : + fname = '%s-%d-%d.%s' % (self.tag, idq_aggregator.floor_div(self.last_save_time, self.cadence), self.cadence, "trg") + path = os.path.join(self.out_path, self.tag, self.tag+"-"+str(fname.split("-")[2])[:5]) + fpath = os.path.join(path, fname) + tmpfile = fpath+"~" + try: + os.makedirs(path) + except OSError: + pass + with open(tmpfile, 'w') as f: + f.write(''.join(self.fdata)) + shutil.move(tmpfile, fpath) + latency = numpy.round(int(aggregator.now()) - buftime) + print >>sys.stdout, "buftime = %d, latency at write stage = %d" % (buftime, latency) def gen_psd_xmldoc(self): xmldoc = lal.series.make_psd_xmldoc(self.psds) @@ -608,12 +609,22 @@ def parse_command_line(): parser.add_option("-l", "--latency", action = "store_true", help = "Print latency to output ascii file. Temporary.") parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence") + parser.add_option("--trigger-start-time", metavar = "seconds", help = "Set the start time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm") + parser.add_option("--trigger-end-time", metavar = "seconds", help = "Set the end time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm") + # # parse the arguments and sanity check # options, filenames = parser.parse_args() + # Sanity check the options + + if options.trigger_start_time is None : + options.trigger_start_time = options.gps_start_time + if options.trigger_end_time is None : + options.trigger_end_time = options.gps_end_time + return options, filenames @@ -636,6 +647,9 @@ channels = data_source_info.channel_dict.keys() # dictionary of basis parameters keyed by ifo, rate basis_params = {} +if options.use_kafka: + from confluent_kafka import Producer + if not options.disable_web_service: # # create a new, empty, Bottle application and make it the current diff --git a/gstlal-ugly/bin/gstlal_etg_pipe b/gstlal-ugly/bin/gstlal_etg_pipe index 09acb1ce713c3a0106462b957c083cc6cc735940..616a2a70c32f75f8554a840610ca83dc6bb9f2e1 100755 --- a/gstlal-ugly/bin/gstlal_etg_pipe +++ b/gstlal-ugly/bin/gstlal_etg_pipe @@ -56,7 +56,6 @@ lsctables.use_in(LIGOLWContentHandler) # get a dictionary of all the segments # -#def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2): def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # FIXME Set proper segsdict = segments.segmentlistdict() # 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code @@ -64,7 +63,6 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # # Chosen so that the overlap is only a ~5% hit in run time for long segments... segment_length = int(10 * start_pad) - #segsdict[ifo] &= segments.segmentlist([boundary_seg]) segsdict[ifo] = segments.segmentlist([boundary_seg]) segsdict[ifo] = segsdict[ifo].protract(start_pad) segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad) @@ -73,16 +71,29 @@ def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # return segsdict +def trigger_segments(ifo, allsegs, boundary_seg, max_template_length = 100): # FIXME Set proper + trigsegsdict = segments.segmentlistdict() + # 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code + start_pad = 512 + max_template_length # FIXME set start_pad to be imported value + # Chosen so that the overlap is only a ~5% hit in run time for long segments... + segment_length = int(10 * start_pad) + + trigsegsdict[ifo] = segments.segmentlist([boundary_seg]) + trigsegsdict[ifo] = gstlaldagparts.breakupsegs(trigsegsdict[ifo], segment_length, start_pad) + if not trigsegsdict[ifo]: + del trigsegsdict[ifo] + + return trigsegsdict + # # get a dictionary of all the channels per gstlal_etg job # -def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channels, data_source_info): +def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, trigsegsdict, ifo, options, channels, data_source_info): etg_nodes = {} cumsum_rates = 0 total_rates = 0 outstr = "" - out_index = 0 n_channels = 0 n_cpu = 0 @@ -104,9 +115,10 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe print "Total jobs needed =", n_cpu print "Evenly distributed streams per job =", int(n_streams) - for seg in segsdict[ifo]: + for seg, trigseg in zip(segsdict[ifo], trigsegsdict[ifo]): cumsum_rates = 0 + out_index = 0 for ii, channel in enumerate(channels,1): n_channels += 1 @@ -129,6 +141,8 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channe inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes, opts = {"gps-start-time":int(seg[0]), "gps-end-time":int(seg[1]), + "trigger-start-time":int(trigseg[0]), + "trigger-end-time":int(trigseg[1]), "data-source":"frames", "channel-name":outstr, "mismatch":options.mismatch, @@ -219,12 +233,13 @@ dag = inspiral_pipe.DAG("etg_trigger_pipe") gstlalETGJob = inspiral_pipe.generic_job("gstlal_etg", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"})) segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length) +trigsegsdict = trigger_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length) # # ETG jobs # -etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, channels, data_source_info) +etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, trigsegsdict, ifo, options, channels, data_source_info) # # all done