Skip to content
Snippets Groups Projects
Commit 0215a27e authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_feature_extractor_pipe: determine drop time based on fft length, split...

gstlal_feature_extractor_pipe: determine drop time based on fft length, split up combiner jobs into disjoint gps ranges
parent 1f3223c0
No related branches found
No related tags found
No related merge requests found
Pipeline #71492 passed with warnings
......@@ -42,7 +42,7 @@ from gstlal.fxtools import multichannel_datasource
from gstlal.fxtools import multirate_datasource
from gstlal.fxtools import utils
PSD_DROP_TIME = 512
PSD_DROP_FACTOR = 16
# =============================
#
......@@ -50,14 +50,21 @@ PSD_DROP_TIME = 512
#
# =============================
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_length = 30):
def seglist_range(start, stop, stride):
b = start
while b <= stop:
seg = segments.segment(int(b), min(utils.floor_div(int(b) + stride, stride), stop))
b = utils.floor_div(int(b) + stride, stride)
yield seg
def analysis_segments(ifo, allsegs, boundary_seg, segment_length, psd_drop_time, max_template_length = 30):
"""
get a dictionary of all the analysis segments
"""
segsdict = segments.segmentlistdict()
# start pad to allow whitener to settle + the maximum template_length
start_pad = PSD_DROP_TIME + max_template_length
start_pad = psd_drop_time + max_template_length
segsdict[ifo] = segments.segmentlist([boundary_seg])
segsdict[ifo] = segsdict[ifo].protract(start_pad)
......@@ -103,7 +110,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdic
# define analysis times
gps_start_time = int(seg[0])
feature_start_time = gps_start_time + PSD_DROP_TIME + max_template_length
feature_start_time = gps_start_time + options.psd_drop_time + max_template_length
feature_end_time = min(int(seg[1]), options.gps_end_time)
feature_extractor_nodes[(ii, seg)] = \
......@@ -169,9 +176,12 @@ def parse_command_line():
# FIXME: once we figure out what the maximum concurrency is for parallel reads, should set that as a sanity check
# calculate psd drop time based on fft length
options.psd_drop_time = options.psd_fft_length * PSD_DROP_FACTOR
# sanity check to enforce a minimum segment length
# Minimum segment length chosen so that the overlap is a ~33% hit in run time
min_segment_length = int(4 * PSD_DROP_TIME)
min_segment_length = int(4 * options.psd_drop_time)
assert options.segment_length >= min_segment_length
return options, filenames
......@@ -209,12 +219,12 @@ aggregator.makedir("logs")
dag = dagparts.DAG("feature_extractor_pipe")
condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}
condor_options = {"request_memory": options.request_memory, "request_cpus": options.request_cpu, "want_graceful_removal": "True", "kill_sig": "15"}
condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options)
feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length)
segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, options.psd_drop_time, max_template_length=max_template_length)
combiner_condor_options = {"request_memory":"4GB", "request_cpus":2, "want_graceful_removal":"True", "kill_sig":"15"}
combiner_condor_options = {"request_memory": "4GB", "request_cpus": 1, "want_graceful_removal": "True", "kill_sig": "15"}
combiner_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, combiner_condor_options)
feature_combiner_job = dagparts.DAGJob("gstlal_feature_combiner", condor_commands = combiner_condor_commands)
......@@ -231,7 +241,12 @@ feature_combiner_options = {
"instrument": ifo,
"tag": "offline",
}
feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = feature_extractor_nodes.values(), opts = feature_combiner_options)
for seg in seglist_range(data_source_info.seg[0], data_source_info.seg[1], 50000):
parent_nodes = [node for (i, job_seg), node in feature_extractor_nodes.items() if seg.intersects(job_seg)]
these_options = dict(feature_combiner_options)
these_options.update({"start-time": seg[0], "end-time": seg[1]})
feature_combiner_nodes = dagparts.DAGNode(feature_combiner_job, dag, parent_nodes = parent_nodes, opts = these_options)
#
# write out dag and sub files
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment