diff --git a/gstlal-inspiral/bin/gstlal_inspiral_workflow b/gstlal-inspiral/bin/gstlal_inspiral_workflow index 617eaa70814462dbf04e1bd700b65d13208611a1..d8b0c93b4f7fb7cd539ffe814e519fff97a3e804 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_workflow +++ b/gstlal-inspiral/bin/gstlal_inspiral_workflow @@ -22,9 +22,11 @@ import os import shutil import sys +from lal.utils import CacheEntry + from gstlal.config.inspiral import Config from gstlal.dags.inspiral import DAG -from gstlal.datafind import DataCache, DataType +from gstlal.datafind import DataCache, DataType, find_online from gstlal.workflows import write_makefile @@ -103,9 +105,17 @@ elif args.command == "create": else: # load filter data products dist_stats = DataCache.find(DataType.MARG_DIST_STATS, root=rank_dir, svd_bins="*") - triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*", subtype="*") - inj_triggers = DataCache.find(DataType.TRIGGERS, root=filter_dir, svd_bins="*") - injections = DataCache.find(DataType.INJECTIONS, root=config.data.rerank_dir) + # use the dist_stats' end times to determine which trigger files should be used + bin_dict = {} # records the end time of dist_stats for every bin + for c in dist_stats.cache: + bin_dict[c.description[:4]] = int(c.segment[1]) if int(c.segment[1]) < int(config.stop) else int(config.stop) + + # load the rest of the filter data products + segments = find_online(config.rank.segments_name, root = os.path.join(config.data.rerank_dir, 'segments'), svd_bins = bin_dict.keys(), subtype = "TRIGGERS") + triggers = find_online(config.rank.triggers_name, start = int(config.start), end = bin_dict, root = config.data.analysis_dir, svd_bins = bin_dict.keys(), subtype = "TRIGGERS") + inj_triggers = find_online(config.rank.inj_triggers_name, start = int(config.start), end = bin_dict, root = config.data.analysis_dir, svd_bins = bin_dict.keys(), subtype = "TRIGGERS", segments = segments) + ## inverted segments are used as vetos for when the original analysis was off + #inverted_segments = find_online(config.rank.segments_name, root = os.path.join(config.data.rerank_dir, 'inverted_segments'), subtype = "TRIGGERS") triggers += inj_triggers triggers = dag.add_and_simplify(triggers) triggers = dag.calc_likelihood(triggers, dist_stats) @@ -115,13 +125,13 @@ elif args.command == "create": pdfs = dag.marginalize_pdf(pdfs) if config.filter.injections: - triggers, inj_triggers = dag.find_injections_lite(triggers, injections) + triggers, inj_triggers = dag.find_injections_lite(triggers) triggers, pdfs = dag.compute_far(triggers, pdfs) dag.plot_summary(triggers, pdfs) dag.plot_background(triggers, pdfs) - dag.plot_bin_background(dist_stats) + #dag.plot_bin_background(dist_stats) dag.plot_sensitivity(triggers) if args.workflow in set(("full", "rerank")): diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py index d94728ad68525cbcd400d37c8fdb84f28a804bc7..46bbe9a417a5cb36a9b8abcefd52e4eaa552aae8 100644 --- a/gstlal-inspiral/python/dags/layers/inspiral.py +++ b/gstlal-inspiral/python/dags/layers/inspiral.py @@ -695,7 +695,7 @@ def marginalize_pdf_layer(config, dag, pdf_cache): return marg_pdf_cache -def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache): +def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache, inverted_segments = None): layer = Layer( "gstlal_inspiral_calc_likelihood", requirements={ @@ -711,6 +711,8 @@ def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache): calc_trigger_cache = trigger_cache.copy(root="rank") calc_triggers = calc_trigger_cache.groupby("bin", "subtype", "dirname") dist_stats = dist_stat_cache.groupby("bin") + if inverted_segments: + inverted_segments = inverted_segments.groupby("bin") for (svd_bin, inj_type, dirname), triggers in trigger_cache.groupby("bin", "subtype", "dirname").items(): # find path relative to current directory @@ -730,14 +732,20 @@ def calc_likelihood_layer(config, dag, trigger_cache, dist_stat_cache): if not config.condor.transfer_files: arguments.append(Option("copy-dir", calc_dirname)) - layer += Node( - arguments = arguments, - inputs = [ + inputs = [ Argument("mass-model", config.prior.mass_model, track=False, suppress=True), Argument("triggers", triggers.files), Option("likelihood-url", dist_stats[svd_bin].files), *add_ranking_stat_file_options(config, svd_bin, transfer_only=True), - ], + ] + if inverted_segments: + inv_seg = inverted_segments[svd_bin].files + assert len(inv_seg) == 1, "Can only give 1 inverted segment file per bin" + inputs.append(Option("vetoes-name", inv_seg[0])) + + layer += Node( + arguments = arguments, + inputs = inputs, outputs = Argument( "calc-triggers", calc_triggers[(svd_bin, inj_type, calc_dirname)].files, @@ -1013,7 +1021,7 @@ def find_injections_layer(config, dag, trigger_db_cache): return trigger_db_cache, inj_trigger_cache -def find_injections_lite_layer(config, dag, trigger_cache, injections): +def find_injections_lite_layer(config, dag, trigger_cache): inj_sqlite_to_xml_layer = Layer( "ligolw_sqlite", name="inj_sqlite_to_xml", @@ -1025,6 +1033,17 @@ def find_injections_lite_layer(config, dag, trigger_cache, injections): }, transfer_files=config.condor.transfer_files, ) + add_sim_to_inj_layer= Layer( + "ligolw_add", + name="add_sim_inspiral_table_to_inj_trigger_file", + requirements={ + "request_cpus": 1, + "request_memory": 2000, + "request_disk": "1GB", + **config.condor.submit + }, + transfer_files=config.condor.transfer_files, + ) injfind_layer = Layer( "lalinspiral_injfind", requirements={ @@ -1049,22 +1068,28 @@ def find_injections_lite_layer(config, dag, trigger_cache, injections): grouped_trigger_cache = trigger_cache.groupby("subtype") if "" in grouped_trigger_cache: - grouped_trigger_cache.pop("") + grouped_trigger_cache.pop("") inj_trigger_files = [] # generate layers for inj_type, trigger_subcache in grouped_trigger_cache.items(): for trigger_file in trigger_subcache.files: - # convert triggers to XML and add sim_inspiral table + # convert triggers to XML inj_sqlite_to_xml_layer += Node( arguments = [ Option("replace"), Option("tmp-space", dagutil.condor_scratch_space()), ], - inputs = [Option("database", trigger_file), Argument("inputs", injections.files)], + inputs = [Option("database", trigger_file)], outputs = Option("extract", trigger_file.replace('sqlite', 'xml')), ) + # Add a sim inspiral table + add_sim_to_inj_layer += Node( + inputs = [Argument("input-trigger-file", trigger_file.replace('sqlite', 'xml')), Argument("input-injection-file", config.filter.injections[inj_type].file)], + outputs = Option("output", trigger_file.replace('sqlite', 'xml')), + ) + # find injections injfind_layer += Node( arguments = Option("time-window", 0.9), @@ -1086,6 +1111,7 @@ def find_injections_lite_layer(config, dag, trigger_cache, injections): inj_trigger_cache = DataCache.from_files(DataType.TRIGGERS, inj_trigger_files) dag.attach(inj_sqlite_to_xml_layer) + dag.attach(add_sim_to_inj_layer) dag.attach(injfind_layer) dag.attach(inj_xml_to_sqlite_layer) @@ -2313,7 +2339,8 @@ def add_and_simplify_layer(config, dag, trigger_cache): "request_memory": 2000, "request_disk": "2GB", **config.condor.submit - }, + + }, transfer_files=config.condor.transfer_files, ) simplify_layer = Layer( @@ -2330,20 +2357,32 @@ def add_and_simplify_layer(config, dag, trigger_cache): aggregate_trigger_files = [] for (svd_bin, trig_type), triggers in trigger_subcache.groupby("bin", "subtype").items(): - for trigger_subset in triggers.chunked(config.rank.merge_level_two_max_files): + for trigger_subset in triggers.chunked(num_files(level)): times = list(trigger_subset.groupby("time").keys()) min_time = min(times) max_time = max(times) - aggregate_trigger_filename = trigger_subset.name.filename( - config.ifos, - span=(min_time[0], max_time[1]), - svd_bin=f"{svd_bin}", - subtype=trig_type, - ) - aggregate_trigger_path = os.path.join( - trigger_subset.name.directory(root="filter", start=min(times)[0]), - aggregate_trigger_filename, - ) + + if level == 1: + # level 1 adding is expected to be for adding files from the analysis dir + # and saving them in the rerank dir, in offline format + aggregate_trigger_filename = DataType.TRIGGERS.filename( + config.ifos, + span=(min_time[0], max_time[1]), + svd_bin=f"{svd_bin}", + subtype="" if config.rank.triggers_name in trig_type else list(config.filter.injections.keys())[0] + ) + aggregate_trigger_path = os.path.join("filter", "triggers", str(min_time[0])[:5], aggregate_trigger_filename) + else: + aggregate_trigger_filename = trigger_subset.name.filename( + config.ifos, + span=(min_time[0], max_time[1]), + svd_bin=f"{svd_bin}", + subtype=trig_type, + ) + aggregate_trigger_path = os.path.join( + trigger_subset.name.directory(root="filter", start=min_time[0]), + aggregate_trigger_filename, + ) aggregate_trigger_files.append(aggregate_trigger_path) if len(trigger_subset.cache) <= 1: continue @@ -2368,6 +2407,12 @@ def add_and_simplify_layer(config, dag, trigger_cache): ) return aggregate_trigger_cache, add_layer, simplify_layer + def num_files(level): + if level == 1: + return config.rank.add_level_one_max_files + else: + return config.rank.add_level_two_max_files + level = 1 remaining_cache = trigger_cache remaining_cache.cache = sorted(remaining_cache.cache) @@ -2412,7 +2457,6 @@ def merge_and_reduce_layer(config, dag, trigger_cache): ) partial_trigger_files = [] - partial_trigger_db_files = [] for (inj_type), triggers in trigger_subcache.groupby("subtype").items(): for trigger_subset in triggers.chunked(num_files(level)): @@ -2433,11 +2477,10 @@ def merge_and_reduce_layer(config, dag, trigger_cache): subtype=inj_type, ) partial_trigger_path = os.path.join( - trigger_subset.name.directory(root="filter", start=min(times)[0]), + trigger_subset.name.directory(root="rank", start=min(times)[0]), partial_trigger_filename, ) partial_trigger_files.append(partial_trigger_path) - partial_trigger_db_files.append(partial_trigger_path.replace('xml.gz', 'sqlite')) if len(trigger_subset.cache) <= 1: continue @@ -2458,12 +2501,8 @@ def merge_and_reduce_layer(config, dag, trigger_cache): DataType.TRIGGERS, partial_trigger_files ) - partial_trigger_db_cache = DataCache.from_files( - DataType.TRIGGERS, - partial_trigger_db_files - ) - return partial_trigger_cache, partial_trigger_db_cache, merge_layer, sqlite_layer + return partial_trigger_cache, merge_layer, sqlite_layer def num_files(level): if level == 1: @@ -2476,13 +2515,42 @@ def merge_and_reduce_layer(config, dag, trigger_cache): level = 1 while len(remaining_cache.cache) > len(trigger_cache.groupby("subtype").keys()): - remaining_cache, remaining_db_cache, merge_layer, sqlite_layer = attach_merge(remaining_cache, level) + remaining_cache, merge_layer, sqlite_layer = attach_merge(remaining_cache, level) dag.attach(merge_layer) - if len(remaining_cache.cache) > len(trigger_cache.groupby("subtype").keys()): - # only convert back to xml if it's not the last level - dag.attach(sqlite_layer) + dag.attach(sqlite_layer) level += 1 + add_segments_layer = Layer( + "ligolw_sqlite", + name="add_segments_to_triggers", + requirements={ + "request_cpus": 1, + "request_memory": 2000, + "request_disk": "1GB", + **config.condor.submit + }, + transfer_files=config.condor.transfer_files, + ) + + remaining_db_cache = [] + for (inj_type), trigger in remaining_cache.groupby("subtype").items(): + add_segments_layer += Node( + arguments = [ + Option("replace"), + Option("tmp-space", dagutil.condor_scratch_space()), + ], + inputs = [Argument("trigger-file", trigger.files), Argument("segments-file", config.source.frame_segments_file)], + outputs = Option("database", trigger.files[0].replace("xml.gz", "sqlite")), + ) + remaining_db_cache.append(trigger.files[0].replace("xml.gz", "sqlite")) + + dag.attach(add_segments_layer) + + remaining_db_cache = DataCache.from_files( + DataType.TRIGGERS, + remaining_db_cache + ) + return remaining_db_cache diff --git a/gstlal-ugly/bin/gstlal_ll_merge_segments b/gstlal-ugly/bin/gstlal_ll_merge_segments index b27d0d304af6ed7016a4ab03c3489f39088425ec..dcc9f4a04c1af5f9fbe884c59085b19402eca5d8 100644 --- a/gstlal-ugly/bin/gstlal_ll_merge_segments +++ b/gstlal-ugly/bin/gstlal_ll_merge_segments @@ -11,6 +11,7 @@ from ligo.lw import utils as ligolw_utils from ligo.lw.utils import segments as ligolw_segments from ligo.lw.utils import process as ligolw_process from lal.utils import CacheEntry +from lal import LIGOTimeGPS @ligolw_array.use_in @ligolw_param.use_in @@ -21,24 +22,42 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): parser = argparse.ArgumentParser(description = 'Combine segments from low-latency analysis') parser.add_argument("--analysis-dir", metavar = "uri", help = "Path to the low-latency analysis directory") parser.add_argument("--segments-name", metavar = "name", default = "statevectorsegments", help = "Name of segments to query") -#parser.add_argument("--svdbin", metavar = "svdbin", default = "0000", help = "Pick one svdbin") +parser.add_argument("--svd-bin", metavar = "svdbin", default = None, help = "Set the SVD bin to merge segments of") parser.add_argument("--subdir", default = "None", help = "Set the subdirectory to search in") -parser.add_argument("--start", default = 0, help = "Set the start time of selecting segments to merge") -parser.add_argument("--end", default = 9999999999, help = "Set the end time of selecting segments to merge") +parser.add_argument("--start", default = None, help = "Set the start time of selecting segments to merge") +parser.add_argument("--end", default = None, help = "Set the end time of selecting segments to merge") +parser.add_argument("--invert", default = False, action = "store_true", help = "Set whether you want to generate an inverse segments file as well") parser.add_argument("-o", "--output", metavar = "file", default = "segments.xml.gz", help = "Set the name of the segments file generated. Default: segments.xml.gz") +parser.add_argument("--inverted-output", metavar = "file", default = "segments_inverted.xml.gz", help = "Set the name of the inverted segments file generated. Default: segments_inverted.xml.gz") parser.add_argument("--verbose", action = "store_true", help = "Be verbose (optional).") args = parser.parse_args() +if args.invert: + assert args.start is not None, "Must provide a start time if invert segments are requested" + assert args.end is not None, "Must provide an end time if invert segments are requested" +else: + if args.start is None: + args.start = 0 + if args.end is None: + args.end = 9999999999 + path = args.analysis_dir name = args.segments_name -#svdbin = args.svdbin +if args.svd_bin: + if isinstance(args.svd_bin, int): + svd_bin = format(args.svd_bin, '04') + elif isinstance(args.svd_bin, str): + svd_bin = args.svd_bin +else: + svd_bin = "[0-9][0-9][0-9][0-9]" + if args.subdir == "None": - files = glob.glob(f'{path}/[0-9][0-9][0-9][0-9][0-9]/H1L1V1-[0-9][0-9][0-9][0-9]_noninj_SEGMENTS-*.xml.gz') + files = glob.glob(f'{path}/[0-9][0-9][0-9][0-9][0-9]/H1L1V1-{svd_bin}_noninj_SEGMENTS-*.xml.gz') else: - files = glob.glob(f'{path}/{args.subdir}/H1L1V1-[0-9][0-9][0-9][0-9]_noninj_SEGMENTS-*.xml.gz') + files = glob.glob(f'{path}/{args.subdir}/H1L1V1-{svd_bin}_noninj_SEGMENTS-*.xml.gz') -#print(files) -print('Num files:', len(files)) +if args.verbose: + print('Num files:', len(files)) first = True @@ -52,7 +71,9 @@ for filename in files: seglist = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(filename, contenthandler = ligolw_segments.LIGOLWContentHandler, verbose=args.verbose), name=f'{name}').coalesce() seglist_end |= seglist -print('SegTable:',seglist_end) +if args.verbose: + print('SegTable:',seglist_end) + # # Write segments to disk @@ -67,3 +88,18 @@ process.set_end_time_now() ligolw_utils.write_filename(xmldoc, args.output, verbose=args.verbose) xmldoc.unlink() + + +if args.invert: + total_segments = segmentlistdict({ifo: segmentlist([segment(LIGOTimeGPS(args.start), LIGOTimeGPS(args.end))]) for ifo in ['H1', 'L1', 'V1']}) + inverted_segments = total_segments - seglist_end + + xmldoc = ligolw.Document() + xmldoc.appendChild(ligolw.LIGO_LW()) + process = ligolw_process.register_to_xmldoc(xmldoc, sys.argv[0], vars(args)) + with ligolw_segments.LigolwSegments(xmldoc, process) as lwseglists: + lwseglists.insert_from_segmentlistdict(inverted_segments, "datasegments") + process.set_end_time_now() + + ligolw_utils.write_filename(xmldoc, args.inverted_output, verbose=args.verbose) + xmldoc.unlink() diff --git a/gstlal/python/datafind.py b/gstlal/python/datafind.py index 6d574bfa2bb1f80eff1368fb9af29c2c488c9780..b53080614848389d288b319bd79751e43adbb0b8 100644 --- a/gstlal/python/datafind.py +++ b/gstlal/python/datafind.py @@ -25,6 +25,8 @@ import os import gwdatafind from lal.utils import CacheEntry from ligo.segments import segment, segmentlist, segmentlistdict +from ligo.lw import utils as ligolw_utils +from ligo.lw.utils import segments as ligolw_segments DEFAULT_DATAFIND_SERVER = os.getenv('LIGO_DATAFIND_SERVER', 'ldr.ldas.cit:80') @@ -264,17 +266,63 @@ class DataCache: # since they don't match the format of offline triggers. # Remove this once the online alaysis has been restructured to be similar to the offline one, # and the old online analyses are not relevant anymore -def find_online(name, start=None, end=None, root = None, extension = None, subtype = None): +def find_online(name, start=0, end=9999999999, root = None, svd_bins = ['*'], extension = "xml.gz", subtype = None, segments = None, output_as_list = False): cache = [] # get all the gps_dirs dirs = os.listdir(root) dirs = [d for d in dirs if os.path.isdir(os.path.join(root, d))] dirs = [d for d in dirs if d.isdigit()] + if subtype == 'TRIGGERS': + if isinstance(svd_bins, int): + svd_bins = [format(svd_bins, '04')] + elif isinstance(svd_bins, str): + svd_bins = [svd_bins] + + # have the ability to provide different start and end times for different svd_bins + if isinstance(start, dict): + assert set(start.keys()) == set(svd_bins), "start dict contains different keys than svd_bins" + else: + start = {svd_bin: start for svd_bin in svd_bins} + if isinstance(end, dict): + assert set(end.keys()) == set(svd_bins), "end dict contains different keys than svd_bins" + else: + end = {svd_bin: end for svd_bin in svd_bins} + + # have the ability to provide different segments files for different svd_bin + # a trigger file will be "found" only if in lies entirely within a segment for that svd_bin + segdict_by_bin = {} + if isinstance(segments, str): + seg = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(segments, contenthandler = ligolw_segments.LIGOLWContentHandler), name="datasegments").coalesce() + for svd_bin in svd_bins: + segdict_by_bin[svd_bin] = seg + elif isinstance(segments, DataCache): + segments = segments.groupby("bin") + assert set(segments.keys()) == set(svd_bins), "segments cache contains different bins than svd_bins" + for svd_bin in svd_bins: + assert len(segments[svd_bin].files) == 1, "can only provide one segments file per svd_bin" + segdict_by_bin[svd_bin] = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(segments[svd_bin].files[0], contenthandler = ligolw_segments.LIGOLWContentHandler), name="datasegments").coalesce() + for d in dirs: - files = glob.glob(os.path.join(root, d, f"*-*_{name}-*-*.*")) - files = [f for f in files if int(CacheEntry.from_T050017(f).segment[0]) > start and int(CacheEntry.from_T050017(f).segment[0]) < end] - cache.extend(files) + files = glob.glob(os.path.join(root, d, f"*-*_{name}-*-*.{extension}")) + files_filtered = [] + for f in files: + c = CacheEntry.from_T050017(f) + svd_bin = c.description[:4] if '*' not in svd_bins else '*' + if int(c.segment[0]) > start[svd_bin] and int(c.segment[1]) < end[svd_bin]: + if segdict_by_bin: + # check if the file lies entirely within the provided segments for that svd_bin + inside = False + segdict_by_ifo = segdict_by_bin[svd_bin] + for ifo in segdict_by_ifo.keys(): + if c.segment in segdict_by_ifo[ifo]: + inside = True + if inside: + files_filtered.append(f) + else: + files_filtered.append(f) + cache.extend(files_filtered) + elif subtype == 'DIST_STATS': # find the greatest gps dir less than the end time gps_dir = max(filter(lambda x: x < end, [int(d) for d in dirs])) @@ -295,7 +343,7 @@ def find_online(name, start=None, end=None, root = None, extension = None, subty binnum = format(binnum, '04') found = 0 for gpsd in sorted([d for d in dirs if int(d) <= gps_dir], reverse = True): - files = glob.glob(os.path.join(root, gpsd, f"*-{binnum}_{name}-*-*.*")) + files = glob.glob(os.path.join(root, gpsd, f"*-{binnum}_{name}-*-*.{extension}")) #files = [CacheEntry.from_T050017(f) for f in files] filedict = {int(CacheEntry.from_T050017(f).segment[0]): f for f in files} try: @@ -307,8 +355,13 @@ def find_online(name, start=None, end=None, root = None, extension = None, subty break if found == 0: raise ValueError(f"Couldn't find {name} files with end time less than {end}") + else: - raise ValueError('The subtype "%s" is not defined. Choose one from "TRIGGERS" or "DIST_STATS".' % subtype) + raise ValueError('The subtype "%s" is not defined. Choose one from "TRIGGERS", "DIST_STATS" or "SEGMENTS".' % subtype) + + typedict = {'TRIGGERS': DataType.TRIGGERS, 'DIST_STATS': DataType.DIST_STATS} + if not output_as_list: + cache = DataCache.from_files(typedict[subtype], cache) return cache