From 0215a27e6b6e7d200e00efa070b1d1afbc236fea Mon Sep 17 00:00:00 2001
From: Patrick Godwin <patrick.godwin@ligo.org>
Date: Tue, 23 Jul 2019 10:25:40 -0700
Subject: [PATCH] gstlal_feature_extractor_pipe: determine drop time based on
 fft length, split up combiner jobs into disjoint gps ranges

---
 .../bin/gstlal_feature_extractor_pipe         | 33 ++++++++++++++-----
 1 file changed, 24 insertions(+), 9 deletions(-)

diff --git a/gstlal-burst/bin/gstlal_feature_extractor_pipe b/gstlal-burst/bin/gstlal_feature_extractor_pipe
index 7a15ab9dcc..259a80724d 100755
--- a/gstlal-burst/bin/gstlal_feature_extractor_pipe
+++ b/gstlal-burst/bin/gstlal_feature_extractor_pipe
@@ -42,7 +42,7 @@ from gstlal.fxtools import multichannel_datasource
 from gstlal.fxtools import multirate_datasource
 from gstlal.fxtools import utils
 
-PSD_DROP_TIME = 512
+PSD_DROP_FACTOR = 16
 
 # =============================
 #
@@ -50,14 +50,21 @@ PSD_DROP_TIME = 512
 #
 # =============================
 
-def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
+def seglist_range(start, stop, stride):
+	b = start
+	while b <= stop:
+		seg = segments.segment(int(b), min(utils.floor_div(int(b) + stride, stride), stop))
+		b = utils.floor_div(int(b) + stride, stride)
+		yield seg
+
+def analysis_segments(ifo, allsegs, boundary_seg, segment_length, psd_drop_time, max_template_length = 30):
 	"""
 	get a dictionary of all the analysis segments
 	"""
 	segsdict = segments.segmentlistdict()
 
 	# start pad to allow whitener to settle + the maximum template_length
-	start_pad = PSD_DROP_TIME + max_template_length
+	start_pad = psd_drop_time + max_template_length
 
 	segsdict[ifo] = segments.segmentlist([boundary_seg])
 	segsdict[ifo] = segsdict[ifo].protract(start_pad)
@@ -103,7 +110,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdic
 
 			# define analysis times
 			gps_start_time = int(seg[0])
-			feature_start_time = gps_start_time + PSD_DROP_TIME + max_template_length
+			feature_start_time = gps_start_time + options.psd_drop_time + max_template_length
 			feature_end_time = min(int(seg[1]), options.gps_end_time)
 
 			feature_extractor_nodes[(ii, seg)] = \
@@ -169,9 +176,12 @@ def parse_command_line():
 
 	# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
 
+	# calculate psd drop time based on fft length
+	options.psd_drop_time = options.psd_fft_length * PSD_DROP_FACTOR
+
 	# sanity check to enforce a minimum segment length
 	# Minimum segment length chosen so that the overlap is a ~33% hit in run time
-	min_segment_length = int(4 * PSD_DROP_TIME)
+	min_segment_length = int(4 * options.psd_drop_time)
 	assert options.segment_length >= min_segment_length
 
 	return options, filenames
@@ -209,12 +219,12 @@ aggregator.makedir("logs")
 
 dag = dagparts.DAG("feature_extractor_pipe")
 
-condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
+condor_options = {"request_memory": options.request_memory, "request_cpus": options.request_cpu, "want_graceful_removal": "True", "kill_sig": "15"}
 condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
 feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands)
-segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)
+segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, options.psd_drop_time, max_template_length=max_template_length)
 
-combiner_condor_options = {"request_memory":"4GB", "request_cpus":2, "want_graceful_removal":"True", "kill_sig":"15"}
+combiner_condor_options = {"request_memory": "4GB", "request_cpus": 1, "want_graceful_removal": "True", "kill_sig": "15"}
 combiner_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, combiner_condor_options)
 feature_combiner_job = dagparts.DAGJob("gstlal_feature_combiner", condor_commands = combiner_condor_commands)
 
@@ -231,7 +241,12 @@ feature_combiner_options = {
 	"instrument": ifo,
 	"tag": "offline",
 }
-feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = feature_extractor_nodes.values(), opts = feature_combiner_options)
+
+for seg in seglist_range(data_source_info.seg[0], data_source_info.seg[1], 50000):
+	parent_nodes = [node for (i, job_seg), node in feature_extractor_nodes.items() if seg.intersects(job_seg)]
+	these_options = dict(feature_combiner_options)
+	these_options.update({"start-time": seg[0], "end-time": seg[1]})
+	feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = parent_nodes, opts = these_options)
 
 #
 # write out dag and sub files
-- 
GitLab