From 3ecef835d6995f3f38d4a47f8fa3ada47b6de99d Mon Sep 17 00:00:00 2001
From: Duncan Meacher <duncan.meacher@ligo.org>
Date: Thu, 2 Nov 2017 13:00:05 -0700
Subject: [PATCH] Included segment info. Split offline jobs into ~5120s chunks

---
 gstlal-ugly/bin/gstlal_etg_pipe               | 115 +++++++++++-------
 .../share/etg/Makefile.gstlal_etg_offline     |  57 ++++++++-
 2 files changed, 123 insertions(+), 49 deletions(-)

diff --git a/gstlal-ugly/bin/gstlal_etg_pipe b/gstlal-ugly/bin/gstlal_etg_pipe
index ca67a6349f..09acb1ce71 100755
--- a/gstlal-ugly/bin/gstlal_etg_pipe
+++ b/gstlal-ugly/bin/gstlal_etg_pipe
@@ -51,11 +51,33 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler):
 	pass
 lsctables.use_in(LIGOLWContentHandler)
 
+
+#
+# get a dictionary of all the segments
+#
+
+#def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2):
+def analysis_segments(ifo, allsegs, boundary_seg, max_template_length = 100):  # FIXME Set proper 
+	segsdict = segments.segmentlistdict()
+	# 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code
+	start_pad = 512 + max_template_length # FIXME set start_pad to be imported value
+	# Chosen so that the overlap is only a ~5% hit in run time for long segments...
+	segment_length = int(10 * start_pad)
+
+	#segsdict[ifo] &= segments.segmentlist([boundary_seg])
+	segsdict[ifo] = segments.segmentlist([boundary_seg])
+	segsdict[ifo] = segsdict[ifo].protract(start_pad)
+	segsdict[ifo] = gstlaldagparts.breakupsegs(segsdict[ifo], segment_length, start_pad)
+	if not segsdict[ifo]:
+		del segsdict[ifo]
+
+	return segsdict
+
 #
 # get a dictionary of all the channels per gstlal_etg job
 #
 
-def etg_node_gen(gstlalETGJob, dag, parent_nodes, options, channels, data_source_info):
+def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, channels, data_source_info):
 	etg_nodes = {}
 	cumsum_rates = 0
 	total_rates = 0
@@ -82,45 +104,47 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, options, channels, data_source
 		print "Total jobs needed =", n_cpu
 		print "Evenly distributed streams per job =", int(n_streams)
 
-	cumsum_rates = 0
-
-	for ii, channel in enumerate(channels,1):
-		n_channels += 1
-		samp_rate = data_source_info.channel_dict[channel]['fsamp']
-		max_samp_rate = min(2048, int(samp_rate))
-		min_samp_rate = min(32, max_samp_rate)
-		n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
-		cumsum_rates += n_rates
-		outstr = outstr + channel + ":" + str(int(samp_rate))
-
-		# Adds channel to current process
-		if cumsum_rates < n_streams:
-			outstr = outstr + " --channel-name="
-
-		# Finalise each process once number of streams passes threshold
-		if cumsum_rates >= n_streams or ii == len(data_source_info.channel_dict.keys()):
-			out_index += 1
-			outpath = options.out_path + "/gstlal_etg/gstlal_etg_%04d" % out_index
-			etg_nodes[channel] = \
-				inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
-					opts = {"gps-start-time":options.gps_start_time,
-						"gps-end-time":options.gps_end_time,
-						"data-source":"frames",
-						"channel-name":outstr,
-						"mismatch":options.mismatch,
-						"qhigh":options.qhigh,
-						"cadence":options.cadence,
-						#"triggers-from-dataframe":"",
-						"disable-web-service":""
-					},
-					input_files = {"frame-cache":options.frame_cache},
-					output_files = {"out-path":outpath}
-				)
-			if options.verbose:
-				print "Job %04d, number of channels = %3d, number of streams = %4d" %(out_index, n_channels, cumsum_rates)
-			cumsum_rates = 0
-			outstr = ""
-			n_channels = 0
+	for seg in segsdict[ifo]:
+
+		cumsum_rates = 0
+
+		for ii, channel in enumerate(channels,1):
+			n_channels += 1
+			samp_rate = data_source_info.channel_dict[channel]['fsamp']
+			max_samp_rate = min(2048, int(samp_rate))
+			min_samp_rate = min(32, max_samp_rate)
+			n_rates = int(numpy.log2(max_samp_rate/min_samp_rate) + 1)
+			cumsum_rates += n_rates
+			outstr = outstr + channel + ":" + str(int(samp_rate))
+
+			# Adds channel to current process
+			if cumsum_rates < n_streams:
+				outstr = outstr + " --channel-name="
+
+			# Finalise each process once number of streams passes threshold
+			if cumsum_rates >= n_streams or ii == len(data_source_info.channel_dict.keys()):
+				out_index += 1
+				outpath = options.out_path + "/gstlal_etg/gstlal_etg_%04d" % out_index
+				etg_nodes[channel] = \
+					inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = parent_nodes,
+						opts = {"gps-start-time":int(seg[0]),
+							"gps-end-time":int(seg[1]),
+							"data-source":"frames",
+							"channel-name":outstr,
+							"mismatch":options.mismatch,
+							"qhigh":options.qhigh,
+							"cadence":options.cadence,
+							#"triggers-from-dataframe":"",
+							"disable-web-service":""
+						},
+						input_files = {"frame-cache":options.frame_cache},
+						output_files = {"out-path":outpath}
+					)
+				if options.verbose:
+					print "Job %04d, number of channels = %3d, number of streams = %4d" %(out_index, n_channels, cumsum_rates)
+				cumsum_rates = 0
+				outstr = ""
+				n_channels = 0
 
 	return etg_nodes
 
@@ -172,8 +196,11 @@ output_dir = "plots"
 #
 
 data_source_info = multichannel_datasource.DataSourceInfo(options)
-instrument = data_source_info.instrument
+ifo = data_source_info.instrument
 channels = data_source_info.channel_dict.keys()
+boundary_seg = data_source_info.seg
+
+max_template_length = 100
 
 #
 # Setup the dag
@@ -183,7 +210,7 @@ try:
 	os.mkdir("logs")
 except:
 	pass
-dag = inspiral_pipe.DAG("etg_pipe")
+dag = inspiral_pipe.DAG("etg_trigger_pipe")
 
 #
 # setup the job classes
@@ -191,11 +218,13 @@ dag = inspiral_pipe.DAG("etg_pipe")
 
 gstlalETGJob = inspiral_pipe.generic_job("gstlal_etg", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
 
+segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, max_template_length)
+
 #
 # ETG jobs
 #
 
-etg_nodes = etg_node_gen(gstlalETGJob, dag, [], options, channels, data_source_info)
+etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, channels, data_source_info)
 
 #
 # all done
diff --git a/gstlal-ugly/share/etg/Makefile.gstlal_etg_offline b/gstlal-ugly/share/etg/Makefile.gstlal_etg_offline
index adca72ef65..572746af1b 100644
--- a/gstlal-ugly/share/etg/Makefile.gstlal_etg_offline
+++ b/gstlal-ugly/share/etg/Makefile.gstlal_etg_offline
@@ -2,7 +2,7 @@ SHELL := /bin/bash
 # condor commands
 # Set the accounting tag from https://ldas-gridmon.ligo.caltech.edu/ldg_accounting/user
 ACCOUNTING_TAG=ligo.dev.o3.detchar.onlinedq.idq
-GROUP_USER=albert.einstein
+GROUP_USER=duncan.meacher
 CONDOR_COMMANDS:=--condor-command=accounting_group=$(ACCOUNTING_TAG) --condor-command=accounting_group_user=$(GROUP_USER)
 
 #########################
@@ -10,9 +10,17 @@ CONDOR_COMMANDS:=--condor-command=accounting_group=$(ACCOUNTING_TAG) --condor-co
 #########################
 
 # The GPS start time for analysis
-START = 1176638000
+#START = 1176638000
+#START = 1176630000
+
+START = 1187000000
+
 # The GPS end time for analysis
-STOP = 1176639000
+#STOP  = 1176639000
+#STOP  = 1176640000
+
+#STOP  = 1188000000
+STOP  = 1187100000
 
 OUTPATH = $(PWD)
 # Number of streams (N_channels x N_rates_per_channel) that each processor will analise
@@ -23,8 +31,39 @@ QHIGH = 40
 # Detector
 CLUSTER:=$(shell hostname -d)
 
+IFO = H1
+#IFO = L1
+
+###############################
+# Segment and frame type info #
+###############################
+
+# Info from https://wiki.ligo.org/viewauth/LSC/JRPComm/ObsRun2
+# Select correct calibration type
+# GSTLAL_SEGMENTS Options
+SEG_SERVER=https://segments.ligo.org
+# C00
+LIGO_SEGMENTS="$(IFO):DMT-ANALYSIS_READY:1"
+# C01
+#LIGO_SEGMENTS="$*:DCS-ANALYSIS_READY_C01:1"
+# C02
+#LIGO_SEGMENTS="$*:DCS-ANALYSIS_READY_C02:1"
+
+SEGMENT_TRIM = 0
+SEGMENT_MIN_LENGTH = 512
+
 FRAME_TYPE=R
 
+#######################
+# GSTLAL VETO Options #
+#######################
+
+# Vetoes file names
+# Obtain veto definer from here: https://code.pycbc.phy.syr.edu/detchar/veto-definitions/blob/master/cbc/O2/
+# As of commit on August 30, 2017: "Added new version of L1, H1 earthquake flag"
+COMMIT = 5449edf6bf96fbd428add6551a51397bc5777f11
+VETODEF = H1L1-HOFT_C00_O2_CBC.xml
+
 #################
 # Web directory #
 #################
@@ -44,11 +83,11 @@ WEBDIR = ~/public_html/observing/$(TAG)/$(START)-$(STOP)-$(RUN)
 ############
 
 all : dag
-	sed -i '/gstlal_etg / s/$$/ |& grep -v '\''XLAL\|GSL\|Generic'\''/' etg_pipe.sh
-	@echo "Submit with: condor_submit_dag etg_pipe.dag"
+	sed -i '/gstlal_etg / s/$$/ |& grep -v '\''XLAL\|GSL\|Generic'\''/' etg_trigger_pipe.sh
+	@echo "Submit with: condor_submit_dag etg_trigger_pipe.dag"
 
 # Run etg pipe to produce dag
-dag : frame.cache plots channel_list.txt
+dag : frame.cache plots channel_list.txt segments.xml.gz
 	./gstlal_etg_pipe \
 		--data-source frames \
 		--gps-start-time $(START) \
@@ -70,6 +109,12 @@ dag : frame.cache plots channel_list.txt
 full_channel_list.txt : frame.cache
 	FrChannels $$(head -n 1 $^ | awk '{ print $$5}' | sed -e "s@file://localhost@@g") > $@
 
+# Produce segments file
+segments.xml.gz : frame.cache
+	ligolw_segment_query_dqsegdb --segment-url=${SEG_SERVER} -q --gps-start-time ${START} --gps-end-time ${STOP} --include-segments=$(LIGO_SEGMENTS) --result-name=datasegments > $@
+	ligolw_cut --delete-column segment:segment_def_cdb --delete-column segment:creator_db --delete-column segment_definer:insertion_time $@
+	gstlal_segments_trim --trim $(SEGMENT_TRIM) --gps-start-time $(START) --gps-end-time $(STOP) --min-length $(SEGMENT_MIN_LENGTH) --output $@ $@
+
 frame.cache :
 	# FIXME force the observatory column to actually be instrument
 	if [[ ${CLUSTER} == *"ligo-wa.caltech.edu" ]] ; then \
-- 
GitLab