diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe index 97798028a5dcde914e9b09205b6aaa7f8764963b..cbf2b0258e2aae58283cf9b4ce60cd6d6bf0282b 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe @@ -43,23 +43,17 @@ __author__ = 'Chad Hanna <chad.hanna@ligo.org>, Patrick Godwin <patrick.godwin@l ### imports from collections import OrderedDict -import functools -import itertools -import os from optparse import OptionParser -import stat +import os import numpy -import lal.series from lal.utils import CacheEntry -from ligo import segments from ligo.lw import ligolw from ligo.lw import lsctables import ligo.lw.utils as ligolw_utils -from gstlal import inspiral from gstlal import inspiral_pipe from gstlal import dagparts from gstlal import datasource @@ -74,150 +68,6 @@ class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): lsctables.use_in(LIGOLWContentHandler) -#---------------------------------------------------------- -### utility functions - -def sim_tag_from_inj_file(injections): - if injections is None: - return None - return injections.replace('.xml', '').replace('.gz', '').replace('-','_') - -def get_bank_params(options, verbose = False): - bank_cache = {} - for bank_cache_str in options.bank_cache: - for c in bank_cache_str.split(','): - ifo = c.split("=")[0] - cache = c.replace(ifo+"=","") - bank_cache.setdefault(ifo, []).append(cache) - - max_time = 0 - template_mchirp_dict = {} - for n, cache in enumerate(bank_cache.values()[0]): - for ce in map(CacheEntry, open(cache)): - for ce in map(CacheEntry, open(ce.path)): - xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler) - snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc) - max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration'))) - idx = options.overlap[n]/2 - template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])] - xmldoc.unlink() - - return template_mchirp_dict, bank_cache, max_time - -def subdir_path(dirlist): - output_path = '/'.join(dirlist) - try: - os.mkdir(output_path) - except: - pass - return output_path - -def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2): - """get a dictionary of all the disjoint 2+ detector combination segments - """ - segsdict = segments.segmentlistdict() - # 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code - start_pad = 512 + max_template_length - # Chosen so that the overlap is only a ~5% hit in run time for long segments... - segment_length = int(5 * start_pad) - for n in range(min_instruments, 1 + len(analyzable_instruments_set)): - for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n): - segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos)) - segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg]) - segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad) - segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad) - if not segsdict[frozenset(ifo_combos)]: - del segsdict[frozenset(ifo_combos)] - return segsdict - -def create_svd_bank_strings(svd_nodes, instruments = None): - # FIXME assume that the number of svd nodes is the same per ifo, a good assumption though - outstrings = [] - for i in range(len(svd_nodes.values()[0])): - svd_bank_string = "" - for ifo in svd_nodes: - if instruments is not None and ifo not in instruments: - continue - try: - svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"]) - except AttributeError: - svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i]) - svd_bank_string = svd_bank_string.strip(",") - outstrings.append(svd_bank_string) - return outstrings - -def svd_bank_cache_maker(svd_bank_strings, injection = False): - if injection: - dir_name = "gstlal_inspiral_inj" - else: - dir_name = "gstlal_inspiral" - svd_cache_entries = [] - parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings] - for svd_bank_parsed_dict in parsed_svd_bank_strings: - for filename in svd_bank_parsed_dict.itervalues(): - svd_cache_entries.append(CacheEntry.from_T050017(filename)) - - return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries] - -def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict): - # first get the previous output in a usable form - lloid_output = {} - for inj in options.injections + [None]: - lloid_output[sim_tag_from_inj_file(inj)] = {} - lloid_diststats = {} - if options.dist_stats_cache: - for ce in map(CacheEntry, open(options.dist_stats_cache)): - lloid_diststats[ce.description.split("_")[0]] = [ce.path] - for ifos in segsdict: - for seg in segsdict[ifos]: - # iterate over the mass space chunks for each segment - for node in inspiral_nodes[(ifos, None)][seg]: - if node is None: - break - len_out_files = len(node.output_files["output-cache"]) - for f in node.output_files["output-cache"]: - # Store the output files and the node for use as a parent dependency - lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node])) - for f in node.output_files["ranking-stat-output-cache"]: - lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f) - for inj in options.injections: - for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]: - if injnode is None: - continue - for f in injnode.output_files["output-cache"]: - # Store the output files and the node and injnode for use as a parent dependencies - bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0] - try: - lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode])) - except KeyError: - lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode])) - - return lloid_output, lloid_diststats - -def set_up_scripts(options): - # Make an xml integrity checker - if options.gzip_test: - with open("gzip_test.sh", "w") as f: - f.write("#!/bin/bash\nsleep 60\ngzip --test $@") - os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR) - - # A pre script to backup data before feeding to lossy programs - # (e.g. clustering routines) - with open("store_raw.sh", "w") as f: - f.write("""#!/bin/bash - for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""") - os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR) - -def load_reference_psd(options): - ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler)) - - # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache - with open('reference_psd.cache', "w") as output_cache_file: - output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd))) - - return ref_psd - - #---------------------------------------------------------- ### command line options @@ -376,38 +226,6 @@ def parse_command_line(): #---------------------------------------------------------- ### DAG utilities -def get_threshold_values(bgbin_indices, svd_bank_strings, options): - """Calculate the appropriate ht-gate-threshold values according to the scale given - """ - if options.ht_gate_threshold_linear is not None: - # A scale is given - mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")] - # use max mchirp in a given svd bank to decide gate threshold - bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices] - gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min) - return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps] - elif options.ht_gate_threshold is not None: - return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given - else: - return None - -def inputs_to_db(jobs, inputs, job_type = 'toSqlite'): - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs] - db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') - return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db) - -def cache_to_db(cache, jobs): - hi_index = cache[-1].description.split('_')[0] - db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path)) - db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) - return db - -def get_rank_file(instruments, boundary_seg, n, basename, job=None): - if job: - return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path) - else: - return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache') - def set_up_jobs(options): jobs = {} @@ -519,910 +337,6 @@ def set_up_jobs(options): return jobs -#---------------------------------------------------------- -### DAG layers - -def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options): - psd_nodes = {} - for ifos in segsdict: - this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict) - for seg in segsdict[ifos]: - psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]]) - psd_nodes[(ifos, seg)] = dagparts.DAGNode( - jobs['refPSD'], - dag, - parent_nodes = parent_nodes, - opts = { - "gps-start-time":int(seg[0]), - "gps-end-time":int(seg[1]), - "data-source":"frames", - "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos), - "psd-fft-length":options.psd_fft_length, - "frame-segments-name": options.frame_segments_name - }, - input_files = { - "frame-cache":options.frame_cache, - "frame-segments-file":options.frame_segments_file - }, - output_files = { - "write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path) - }, - ) - - # Make the reference PSD cache - # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache - with open('reference_psd.cache', "w") as output_cache_file: - for node in psd_nodes.values(): - output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"]))) - - return psd_nodes - -def median_psd_layer(dag, jobs, parent_nodes, *args): - gpsmod5 = str(int(boundary_seg[0]))[:5] - median_psd_path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]) - - # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache - median_psd_nodes = [] - for chunk, nodes in enumerate(dagparts.groups(parent_nodes.values(), 50)): - median_psd_node = \ - dagparts.DAGNode(jobs['medianPSD'], dag, - parent_nodes = parent_nodes.values(), - input_files = {"": [node.output_files["write-psd"] for node in nodes]}, - output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = median_psd_path)} - ) - median_psd_nodes.append(median_psd_node) - - median_psd_node = \ - dagparts.DAGNode(jobs['medianPSD'], dag, - parent_nodes = median_psd_nodes, - input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]}, - output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]))} - ) - return median_psd_node - -def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, template_mchirp_dict): - svd_nodes = {} - new_template_mchirp_dict = {} - svd_dtdphi_map = {} - for ifo, list_of_svd_caches in bank_cache.items(): - bin_offset = 0 - for j, svd_caches in enumerate(list_of_svd_caches): - svd_caches = map(CacheEntry, open(svd_caches)) - for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches): - # First sort out the clipleft, clipright options - clipleft = [] - clipright = [] - ids = [] - mchirp_interval = (float("inf"), 0) - individual_svd_cache = map(CacheEntry, open(individual_svd_cache)) - for n, f in enumerate(ce.path for ce in individual_svd_cache): - # handle template bank clipping - clipleft.append(options.overlap[j] / 2) - clipright.append(options.overlap[j] / 2) - ids.append("%d_%d" % (i+bin_offset, n)) - if f in template_mchirp_dict: - mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1])) - svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j] - - svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path) - if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0): - new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval - - svdnode = dagparts.DAGNode( - jobs['svd'], - dag, - parent_nodes = parent_nodes, - opts = { - "svd-tolerance":options.tolerance, - "flow":options.flow[j], - "sample-rate":options.sample_rate, - "clipleft":clipleft, - "clipright":clipright, - "samples-min":options.samples_min[j], - "samples-max-256":options.samples_max_256, - "samples-max-64":options.samples_max_64, - "samples-max":options.samples_max, - "autocorrelation-length":options.autocorrelation_length, - "bank-id":ids, - "identity-transform":options.identity_transform, - "ortho-gate-fap":0.5 - }, - input_files = {"reference-psd":psd}, - input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]}, - input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"), - output_files = {"write-svd":svd_bank_name}, - ) - - # impose a priority to help with depth first submission - svdnode.set_priority(99) - svd_nodes.setdefault(ifo, []).append(svdnode) - bin_offset += i+1 - - # Plot template/svd bank jobs - primary_ifo = bank_cache.keys()[0] - dagparts.DAGNode( - jobs['plotBanks'], - dag, - parent_nodes = sum(svd_nodes.values(),[]), - opts = {"plot-template-bank":"", "output-dir": output_dir}, - input_files = {"template-bank-file":options.template_bank}, - ) - - return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map - -def inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict): - inspiral_nodes = {} - for ifos in segsdict: - # FIXME: handles more than 3 ifos with same cpu/memory requests - inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3) - inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3) - - # setup dictionaries to hold the inspiral nodes - inspiral_nodes[(ifos, None)] = {} - ignore = {} - injection_files = [] - for injections in options.injections: - min_chirp_mass, max_chirp_mass, injections = injections.split(':') - injection_files.append(injections) - min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass) - inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {} - ignore[injections] = [] - for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)): - if max_chirp_mass <= bounds[0]: - ignore[injections].append(int(bgbin_index)) - # NOTE putting a break here assumes that the min chirp mass - # in a subbank increases with bin number, i.e. XXXX+1 has a - # greater minimum chirpmass than XXXX, for all XXXX. Note - # that the reverse is not true, bin XXXX+1 may have a lower - # max chirpmass than bin XXXX. - elif min_chirp_mass > bounds[1]: - ignore[injections].append(int(bgbin_index)) - - # FIXME choose better splitting? - numchunks = 50 - - # only use a channel dict with the relevant channels - this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict) - - # get the svd bank strings - svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys()) - - # get a mapping between chunk counter and bgbin for setting priorities - bgbin_chunk_map = {} - - for seg in segsdict[ifos]: - if injection_files: - output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]]) - - if jobs[inspiral_name] is None: - # injection-only run - inspiral_nodes[(ifos, None)].setdefault(seg, [None]) - - else: - output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]]) - for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)): - bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)] - # setup output names - output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)] - output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] - dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] - - for bgbin in bgbin_indices: - bgbin_chunk_map.setdefault(bgbin, chunk_counter) - - # Calculate the appropriate ht-gate-threshold values according to the scale given - threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options) - - # non injection node - noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag, - parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]), - opts = { - "psd-fft-length":options.psd_fft_length, - "ht-gate-threshold":threshold_values, - "frame-segments-name":options.frame_segments_name, - "gps-start-time":int(seg[0]), - "gps-end-time":int(seg[1]), - "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), - "tmp-space":dagparts.condor_scratch_space(), - "track-psd":"", - "control-peak-time":options.control_peak_time, - "coincidence-threshold":options.coincidence_threshold, - "singles-threshold":options.singles_threshold, - "fir-stride":options.fir_stride, - "data-source":"frames", - "local-frame-caching":"", - "min-instruments":options.min_instruments, - "reference-likelihood-file":options.reference_likelihood_file - }, - input_files = { - "time-slide-file":options.time_slide_file, - "frame-cache":options.frame_cache, - "frame-segments-file":options.frame_segments_file, - "reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"], - "blind-injections":options.blind_injections, - "veto-segments-file":options.vetoes, - }, - input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)}, - output_cache_files = { - "output-cache":output_names, - "ranking-stat-output-cache":dist_stat_names - } - ) - - # Set a post script to check for file integrity - if options.gzip_test: - noninjnode.set_post_script("gzip_test.sh") - noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names)) - - # impose a priority to help with depth first submission - noninjnode.set_priority(chunk_counter+15) - - inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode) - - # process injections - for injections in injection_files: - # setup output names - sim_name = sim_tag_from_inj_file(injections) - - bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]] - - for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)): - bgbin_indices, svd_bank_strings = zip(*bgbin_list) - output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices] - output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] - svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))] - try: - reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"] - parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices] - except AttributeError: ### injection-only run - reference_psd = psd_nodes[(ifos, seg)] - parents = [] - - svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names] - input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name) - - # Calculate the appropriate ht-gate-threshold values according to the scale given - threshold_values = get_threshold_values(bgbin_indices, svd_bank_strings, options) - - # setup injection node - # FIXME: handles more than 3 ifos with same cpu/memory requests - injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag, - parent_nodes = parents, - opts = { - "psd-fft-length":options.psd_fft_length, - "ht-gate-threshold":threshold_values, - "frame-segments-name":options.frame_segments_name, - "gps-start-time":int(seg[0]), - "gps-end-time":int(seg[1]), - "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), - "tmp-space":dagparts.condor_scratch_space(), - "track-psd":"", - "control-peak-time":options.control_peak_time, - "coincidence-threshold":options.coincidence_threshold, - "singles-threshold":options.singles_threshold, - "fir-stride":options.fir_stride, - "data-source":"frames", - "local-frame-caching":"", - "min-instruments":options.min_instruments, - "reference-likelihood-file":options.reference_likelihood_file - }, - input_files = { - "time-slide-file":options.inj_time_slide_file, - "frame-cache":options.frame_cache, - "frame-segments-file":options.frame_segments_file, - "reference-psd":reference_psd, - "veto-segments-file":options.vetoes, - "injections": injections - }, - input_cache_files = {"svd-bank-cache":svd_names}, - input_cache_file_name = input_cache_name, - output_cache_files = {"output-cache":output_names} - ) - # Set a post script to check for file integrity - if options.gzip_test: - injnode.set_post_script("gzip_test.sh") - injnode.add_post_script_arg(" ".join(output_names)) - - # impose a priority to help with depth first submission - if bgbin_chunk_map: - injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1) - else: - injnode.set_priority(chunk_counter+1) - - inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode) - - # Replace mchirplo:mchirphi:inj.xml with inj.xml - options.injections = [inj.split(':')[-1] for inj in options.injections] - - # NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis - lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict) - - return inspiral_nodes, lloid_output, lloid_diststats - -def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs): - ligolw_add_nodes = [] - for inj in options.injections: - inj_snr_nodes = [] - - inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[], - opts = { - "output-path":jobs['injSplitter'].output_path, - "usertag": sim_tag_from_inj_file(inj.split(":")[-1]), - "nsplit": num_split_inj_snr_jobs - }, - input_files = {"": inj.split(":")[-1]} - ) - inj_splitter_node.set_priority(98) - - # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache - injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)] - for injection_file in injection_files: - injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node], - # FIXME somehow choose the actual flow based on mass? - # max(flow) is chosen for performance not - # correctness hopefully though it will be good - # enough - opts = {"flow":max(options.flow),"fmax":options.fmax}, - input_files = { - "injection-file": injection_file, - "reference-psd-cache": "reference_psd.cache" - } - ) - injSNRnode.set_priority(98) - inj_snr_nodes.append(injSNRnode) - - addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes, - input_files = {"": ' '.join(injection_files)}, - output_files = {"output": inj.split(":")[-1]} - ) - - ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode], - opts = {"sql-file":options.injection_proc_sql_file, "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":addnode.output_files["output"]} - ) - ) - return ligolw_add_nodes - -def summary_plot_layer(dag, jobs, parent_nodes, options, injdbs, noninjdb): - plotnodes = [] - - ### common plot options - common_plot_opts = { - "segments-name": options.frame_segments_name, - "tmp-space": dagparts.condor_scratch_space(), - "output-dir": output_dir, - "likelihood-file":"post_marginalized_likelihood.xml.gz", - "shrink-data-segments": 32.0, - "extend-veto-segments": 8., - } - sensitivity_opts = { - "output-dir":output_dir, - "tmp-space":dagparts.condor_scratch_space(), - "veto-segments-name":"vetoes", - "bin-by-source-type":"", - "dist-bins":200, - "data-segments-name":"datasegments" - } - - ### plot summary - opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""} - opts.update(common_plot_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSummary'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"": [noninjdb] + injdbs} - )) - - ### isolated precession plot summary - opts = {"user-tag": "PRECESSION_LLOID_COMBINED", "isolate-precession": "", "plot-group": 1} - opts.update(common_plot_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSummaryIsolatePrecession'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"":[noninjdb] + injdbs} - )) - - for injdb in injdbs: - ### individual injection plot summary - opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1], "remove-precession": "", "plot-group": 1} - opts.update(common_plot_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummary'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"":[noninjdb] + [injdb]} - )) - - ### isolated precession injection plot summary - opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "isolate-precession": "", "plot-group": 1} - opts.update(common_plot_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummaryIsolatePrecession'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"":[noninjdb] + [injdb]} - )) - - ### sensitivity plots - opts = {"user-tag": "ALL_LLOID_COMBINED"} - opts.update(sensitivity_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"zero-lag-database": noninjdb, "": injdbs} - )) - - for injdb in injdbs: - opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]} - opts.update(sensitivity_opts) - plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], - opts = opts, - input_files = {"zero-lag-database": noninjdb, "": injdb} - )) - - ### background plots - plotnodes.append(dagparts.DAGNode(jobs['plotBackground'], dag, parent_nodes = [farnode], - opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, - input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb} - )) - - return plotnodes - -def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete): - """clean intermediate merger products - """ - for db in dbs_to_delete: - dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, - input_files = {"": db} - ) - - for margfile in margfiles_to_delete: - dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, - input_files = {"": margfile} - ) - return None - -def inj_psd_layer(segsdict, options): - psd_nodes = {} - psd_cache_files = {} - for ce in map(CacheEntry, open(options.psd_cache)): - psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path)) - for ifos in segsdict: - reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s) - ref_psd_file_num = 0 - for seg in segsdict[ifos]: - while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]): - ref_psd_file_num += 1 - psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1] - ref_psd_parent_nodes = [] - return psd_nodes, ref_psd_parent_nodes - -def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd): - """mass model node - """ - if options.mass_model_file is None: - # choose, arbitrarily, the lowest instrument in alphabetical order - model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path) - model_node = dagparts.DAGNode(jobs['model'], dag, - input_files = {"template-bank": options.template_bank, "reference-psd": psd}, - opts = {"model":options.mass_model}, - output_files = {"output": model_file_name}, - parent_nodes = parent_nodes - ) - return [model_node], model_file_name - else: - return [], options.mass_model_file - -def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None): - """merge and cluster from sqlite database - """ - if input_files: - input_ = {"": input_files} - else: - input_ = {} - - # Merge database into chunks - sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes, - opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, - input_files = input_, - input_cache_files = {"input-cache": db_cache}, - output_files = {"database":db}, - input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache') - ) - - # cluster database - return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], - opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()}, - input_files = {"": db} - ) - -def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map): - instruments = "".join(sorted(instrument_set)) - margnodes = {} - - # NOTE! we rely on there being identical templates in each instrument, - # so we just take one of the values of the svd_nodes which are a dictionary - # FIXME, the svd nodes list has to be the same as the sorted keys of - # lloid_output. svd nodes should be made into a dictionary much - # earlier in the code to prevent a mishap - one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate( svd_nodes.values()[0])) - - # Here n counts the bins - # FIXME - this is broken for injection dags right now because of marg nodes - # first non-injections, which will get skipped if this is an injections-only run - for bin_key in sorted(lloid_output[None].keys()): - outputs = lloid_output[None][bin_key] - diststats = lloid_diststats[bin_key] - inputs = [o[0] for o in outputs] - parents = dagparts.flatten([o[1] for o in outputs]) - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) - - # FIXME we keep this here in case we someday want to have a - # mass bin dependent prior, but it really doesn't matter for - # the time being. - priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag, - parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node or [], - opts = { - "instrument": instrument_set, - "background-prior": 1, - "min-instruments": options.min_instruments, - "coincidence-threshold":options.coincidence_threshold, - "df": "bandwidth" - }, - input_files = { - "svd-file": one_ifo_svd_nodes[bin_key].output_files["write-svd"], - "mass-model-file": model_file, - "dtdphi-file": svd_dtdphi_map[bin_key], - "psd-xml": ref_psd - }, - output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])} - ) - # Create a file that has the priors *and* all of the diststats - # for a given bin marginalized over time. This is all that will - # be needed to compute the likelihood - diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag, - parent_nodes = [priornode] + parents, - opts = {"marginalize": "ranking-stat"}, - input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]}, - output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])}, - input_cache_file_name = rankfile('MARG_DIST_STATS') - ) - - margnodes[bin_key] = diststats_per_bin_node - - return margnodes - -def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set): - rankpdf_nodes = [] - rankpdf_zerolag_nodes = [] - instruments = "".join(sorted(instrument_set)) - - # Here n counts the bins - for bin_key in sorted(marg_nodes.keys()): - rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) - - calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag, - parent_nodes = [marg_nodes[bin_key]], - opts = {"ranking-stat-samples":options.ranking_stat_samples}, - input_files = {"": marg_nodes[bin_key].output_files["output"]}, - output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])}, - ) - - calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag, - parent_nodes = [marg_nodes[bin_key]], - opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples}, - input_files = {"": marg_nodes[bin_key].output_files["output"]}, - output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])}, - ) - - rankpdf_nodes.append(calcranknode) - rankpdf_zerolag_nodes.append(calcrankzerolagnode) - - return rankpdf_nodes, rankpdf_zerolag_nodes - -def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set): - likelihood_nodes = {} - instruments = "".join(sorted(instrument_set)) - - # non-injection jobs - for bin_key in sorted(lloid_output[None].keys()): - outputs = lloid_output[None][bin_key] - diststats = lloid_diststats[bin_key] - inputs = [o[0] for o in outputs] - - # (input files for next job, dist stat files, parents for next job) - likelihood_nodes[None, bin_key] = (inputs, marg_nodes[bin_key].output_files["output"], [marg_nodes[bin_key]]) - - # injection jobs - for inj in options.injections: - lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)] - for bin_key in sorted(lloid_nodes.keys()): - outputs = lloid_nodes[bin_key] - diststats = lloid_diststats[bin_key] - if outputs is not None: - inputs = [o[0] for o in outputs] - parents = dagparts.flatten([o[1] for o in outputs]) - - parents.append(marg_nodes[bin_key]) - likelihood_url = marg_nodes[bin_key].output_files["output"] - likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents) - - return likelihood_nodes - -def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments): - num_chunks = 100 - innodes = {} - - # after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run - for (sim_tag, bin_key), (inputs, likelihood_url, parents) in sorted(likelihood_nodes.items()): - db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache') - xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz") - snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file - cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file - - # cluster sub banks - cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents, - opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":inputs} - ) - - # merge sub banks - merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node], - input_files = {"":inputs}, - output_files = {"output":xml} - ) - - # cluster and simplify sub banks - cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node], - opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":xml} - ) - - # assign likelihoods - likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag, - parent_nodes = [cluster_node], - opts = {"tmp-space":dagparts.condor_scratch_space()}, - input_files = {"likelihood-url":likelihood_url, "": xml} - ) - - sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node], - opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":xml}, - output_files = {"database":db}, - ) - sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], - opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, - input_files = {"":db} - ) - - innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode) - - # make sure outnodes has a None key, even if its value is an empty list - # FIXME injection dag is broken - innodes.setdefault(None, []) - - if options.vetoes is None: - vetoes = [] - else: - vetoes = [options.vetoes] - - chunk_nodes = [] - dbs_to_delete = [] - # Process the chirp mass bins in chunks to paralellize the merging process - for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): - try: - dbs = [node.input_files[""] for node in nodes] - parents = nodes - - except AttributeError: - # analysis started at merger step but seeded by lloid files which - # have already been merged into one file per background - # bin, thus the analysis will begin at this point - dbs = nodes - parents = [] - - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) - - # Merge and cluster the final non injection database - noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file) - chunk_nodes.append(noninjsqlitenode) - dbs_to_delete.append(noninjdb) - - # Merge the final non injection database - outnodes = [] - injdbs = [] - if options.non_injection_db: #### injection-only run - noninjdb = options.non_injection_db - else: - final_nodes = [] - for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): - noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite') - - # cluster the final non injection database - noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file) - final_nodes.append(noninjsqlitenode) - - input_files = (vetoes + [options.frame_segments_file]) - input_cache_files = [node.input_files[""] for node in final_nodes] - noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') - noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files) - - cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode], - input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} - ) - - outnodes.append(cpnode) - - if options.injections: - iterable_injections = options.injections - else: - iterable_injections = options.injections_for_merger - - for injections in iterable_injections: - # extract only the nodes that were used for injections - chunk_nodes = [] - - for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): - try: - dbs = [injnode.input_files[""] for injnode in injnodes] - parents = injnodes - except AttributeError: - dbs = injnodes - parents = [] - - # Setup the final output names, etc. - dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] - injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) - - # merge and cluster - clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file) - chunk_nodes.append(clusternode) - dbs_to_delete.append(injdb) - - - final_nodes = [] - for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): - # Setup the final output names, etc. - injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite') - - # merge and cluster - clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file) - final_nodes.append(clusternode) - - # Setup the final output names, etc. - injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') - injdbs.append(injdb) - injxml = injdb.replace('.sqlite','.xml.gz') - - xml_input = injxml - - # merge and cluster - parent_nodes = final_nodes + ligolw_add_nodes - input_files = (vetoes + [options.frame_segments_file, injections]) - input_cache_files = [node.input_files[""] for node in final_nodes] - clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files) - - clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode], - opts = {"tmp-space":dagparts.condor_scratch_space()}, - output_files = {"extract":injxml}, - input_files = {"database":injdb} - ) - - inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode], - opts = {"time-window":0.9}, - input_files = {"":injxml} - ) - - sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode], - opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, - output_files = {"database":injdb}, - input_files = {"":xml_input} - ) - - cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode], - input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} - ) - - outnodes.append(cpnode) - - return injdbs, noninjdb, outnodes, dbs_to_delete - -def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes): - ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes] - margjobs = [jobs['marginalize'], jobs['marginalizeWithZerolag']] - margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file] - filesuffixs = ['', '_with_zerolag'] - - margnum = 16 - all_margcache = [] - all_margnodes = [] - final_margnodes = [] - for nodes, job, margfile, filesuffix in zip(ranknodes, margjobs, margfiles, filesuffixs): - try: - margin = [node.output_files["output"] for node in nodes] - parents = nodes - except AttributeError: ### analysis started at merger step - margin = nodes - parents = [] - - margnodes = [] - margcache = [] - - # split up the marginalization into groups of 10 - # FIXME: is it actually groups of 10 or groups of 16? - for margchunk in dagparts.groups(margin, margnum): - if nodes: - marg_ce = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margchunk] - margcache.append(dagparts.group_T050017_filename_from_T050017_files(marg_ce, '.xml.gz', path = job.output_path)) - margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = parents, - opts = {"marginalize": "ranking-stat-pdf"}, - output_files = {"output": margcache[-1]}, - input_cache_files = {"likelihood-cache": margchunk}, - input_cache_file_name = os.path.basename(margcache[-1]).replace('.xml.gz','.cache') - )) - - all_margcache.append(margcache) - all_margnodes.append(margnodes) - - if not options.marginalized_likelihood_file: ### not an injection-only run - for nodes, job, margnodes, margcache, margfile, filesuffix in zip(ranknodes, margjobs, all_margnodes, all_margcache, margfiles, filesuffixs): - final_margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = margnodes, - opts = {"marginalize": "ranking-stat-pdf"}, - output_files = {"output": "marginalized_likelihood%s.xml.gz"%filesuffix}, - input_cache_files = {"likelihood-cache": margcache}, - input_cache_file_name = "marginalized_likelihood%s.cache"%filesuffix - )) - - return final_margnodes, dagparts.flatten(all_margcache) - -def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes): - """compute FAPs and FARs - """ - margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file] - filesuffixs = ['', '_with_zerolag'] - - for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs): - if options.marginalized_likelihood_file: ### injection-only run - parents = final_sqlite_nodes - marginalized_likelihood_file = margfile - - else: - parents = [margnode] + final_sqlite_nodes - marginalized_likelihood_file = margnode.output_files["output"] - - farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents, - opts = {"tmp-space":dagparts.condor_scratch_space()}, - input_files = { - "background-bins-file": marginalized_likelihood_file, - "injection-db": [injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs] if 'zerolag' in filesuffix else injdbs, - "non-injection-db": noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') if 'zerolag' in filesuffix else noninjdb - } - ) - - if 'zerolag' not in filesuffix: - outnode = farnode - - return outnode - -def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir): - """calculate horizon distance - """ - dagparts.DAGNode(jobs['horizon'], dag, - parent_nodes = psd_nodes.values(), - input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]}, - output_files = {"":dagparts.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)} - ) - -def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs): - """create a summary page - """ - output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"] - output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs]) - output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs]) - - dagparts.DAGNode(jobs['summaryPage'], dag, parent_nodes = plotnodes, - opts = { - "title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), - "webserver-dir":options.web_dir, - "glob-path":output_dir, - "output-user-tag":output_user_tags - } - ) - #---------------------------------------------------------- ### main @@ -1458,24 +372,24 @@ if __name__ == '__main__': dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs) # generate xml integrity checker (if requested) and pre-script to back up data - set_up_scripts(options) + inspiral_pipe.set_up_scripts(options) # Get mchirp boundaries of banks, maximum duration of templates, and analysis segments if options.bank_cache or options.psd_cache: if options.bank_cache: - template_mchirp_dict, bank_cache, max_time = get_bank_params(options) + template_mchirp_dict, bank_cache, max_time = inspiral_pipe.get_bank_params(options) instrument_set = bank_cache.keys() elif options.psd_cache: template_mchirp_dict, svd_nodes, max_time = inspiral_pipe.get_svd_bank_params(options.svd_bank_cache) instrument_set = svd_nodes.keys() - segsdict = analysis_segments(set(instrument_set), detectors.frame_segments, boundary_seg, max_time, options.min_instruments) + segsdict = inspiral_pipe.analysis_segments(set(instrument_set), detectors.frame_segments, boundary_seg, max_time, options.min_instruments) instruments = "".join(sorted(instrument_set)) if options.psd_cache: ### reference psd jobs - psd_nodes, ref_psd_parent_nodes = inj_psd_layer(segsdict, options) + psd_nodes, ref_psd_parent_nodes = inspiral_pipe.inj_psd_layer(segsdict, options) elif options.lloid_cache: # starting analysis at merger step, nothing to do here @@ -1483,68 +397,68 @@ if __name__ == '__main__': elif options.reference_psd is None: # Compute the PSDs for each segment - psd_nodes = ref_psd_layer(dag, jobs, [], segsdict, channel_dict, options) + psd_nodes = inspiral_pipe.ref_psd_layer(dag, jobs, [], segsdict, channel_dict, options) # plot the horizon distance - horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir) + inspiral_pipe.horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir, instruments) # compute the median PSD - median_psd_node = median_psd_layer(dag, jobs, psd_nodes, options) + median_psd_node = inspiral_pipe.median_psd_layer(dag, jobs, psd_nodes, options, boundary_seg, instruments) ref_psd = median_psd_node.output_files["output-name"] ref_psd_parent_nodes = [median_psd_node] else: # load reference PSD - ref_psd = load_reference_psd(options) + ref_psd = inspiral_pipe.load_reference_psd(options) ref_psd_parent_nodes = [] # Calculate Expected SNR jobs if not options.lloid_cache and not options.disable_calc_inj_snr: - ligolw_add_nodes = expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100) + ligolw_add_nodes = inspiral_pipe.expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs = 100) else: ligolw_add_nodes = [] if options.bank_cache: # Compute SVD banks - svd_nodes, template_mchirp_dict, svd_dtdphi_map = svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, template_mchirp_dict) + svd_nodes, template_mchirp_dict, svd_dtdphi_map = inspiral_pipe.svd_layer(dag, jobs, ref_psd_parent_nodes, ref_psd, bank_cache, options, boundary_seg, output_dir, template_mchirp_dict) # mass model - model_node, model_file = mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd) + model_node, model_file = inspiral_pipe.mass_model_layer(dag, jobs, ref_psd_parent_nodes, instruments, options, boundary_seg, ref_psd) else: model_node = None model_file = options.mass_model_file if not options.lloid_cache: # Inspiral jobs by segment - inspiral_nodes, lloid_output, lloid_diststats = inspiral_layer(dag, jobs, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict) + inspiral_nodes, lloid_output, lloid_diststats = inspiral_pipe.inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict) # marginalize jobs - marg_nodes = marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map) + marg_nodes = inspiral_pipe.marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map) # calc rank PDF jobs - rankpdf_nodes, rankpdf_zerolag_nodes = calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set) + rankpdf_nodes, rankpdf_zerolag_nodes = inspiral_pipe.calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set) # final marginalization step - final_marg_nodes, margfiles_to_delete = final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes) + final_marg_nodes, margfiles_to_delete = inspiral_pipe.final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options) # likelihood jobs - likelihood_nodes = likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set) + likelihood_nodes = inspiral_pipe.likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set) # Setup clustering and/or merging # after all of the likelihood ranking and preclustering is finished put everything into single databases based on the injection file (or lack thereof) - injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, instruments) + injdbs, noninjdb, final_sqlite_nodes, dbs_to_delete = inspiral_pipe.sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, boundary_seg, instruments) # Compute FAR - farnode = compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes) + farnode = inspiral_pipe.compute_far_layer(dag, jobs, final_marg_nodes, injdbs, noninjdb, final_sqlite_nodes, options) # make summary plots - plotnodes = summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb) + plotnodes = inspiral_pipe.summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb, output_dir) # make a web page - summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs) + inspiral_pipe.summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs, output_dir) # rm intermediate merger products - clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete) + inspiral_pipe.clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete) # # generate DAG files diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py index 4e982b8c0cb6d72d048ff09287aa613e2afd5f03..bbeafafaf0c4fee5a59fc4377c195ac899f1eaff 100644 --- a/gstlal-inspiral/python/inspiral_pipe.py +++ b/gstlal-inspiral/python/inspiral_pipe.py @@ -1,4 +1,5 @@ # Copyright (C) 2013--2014 Kipp Cannon, Chad Hanna +# Copyright (C) 2019 Patrick Godwin # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the @@ -36,12 +37,961 @@ # - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments # - On line 201, fix the comment or explain what the comment is meant to be -import socket, copy, doctest +# +# imports +# + +import copy +import doctest +import functools +import itertools +import os +import socket +import stat + +import lal.series +from lal.utils import CacheEntry + from ligo import segments from ligo.lw import lsctables, ligolw from ligo.lw import utils as ligolw_utils + +from gstlal import dagparts +from gstlal import datasource +from gstlal import inspiral from gstlal import svd_bank -from lal.utils import CacheEntry + + +# +# LIGOLW initialization +# + + +class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): + pass +lsctables.use_in(LIGOLWContentHandler) + + +# +# DAG layers +# + + +def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options): + psd_nodes = {} + for ifos in segsdict: + this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict) + for seg in segsdict[ifos]: + psd_path = subdir_path([jobs['refPSD'].output_path, str(int(seg[0]))[:5]]) + psd_nodes[(ifos, seg)] = dagparts.DAGNode( + jobs['refPSD'], + dag, + parent_nodes = parent_nodes, + opts = { + "gps-start-time":int(seg[0]), + "gps-end-time":int(seg[1]), + "data-source":"frames", + "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict, ifos = ifos), + "psd-fft-length":options.psd_fft_length, + "frame-segments-name": options.frame_segments_name + }, + input_files = { + "frame-cache":options.frame_cache, + "frame-segments-file":options.frame_segments_file + }, + output_files = { + "write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = psd_path) + }, + ) + + # Make the reference PSD cache + # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache + with open('reference_psd.cache', "w") as output_cache_file: + for node in psd_nodes.values(): + output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(node.output_files["write-psd"]))) + + return psd_nodes + +def median_psd_layer(dag, jobs, parent_nodes, options, boundary_seg, instruments): + gpsmod5 = str(int(boundary_seg[0]))[:5] + median_psd_path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]) + + # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache + median_psd_nodes = [] + for chunk, nodes in enumerate(dagparts.groups(parent_nodes.values(), 50)): + median_psd_node = \ + dagparts.DAGNode(jobs['medianPSD'], dag, + parent_nodes = parent_nodes.values(), + input_files = {"": [node.output_files["write-psd"] for node in nodes]}, + output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD_CHUNK_%04d" % chunk, boundary_seg, '.xml.gz', path = median_psd_path)} + ) + median_psd_nodes.append(median_psd_node) + + median_psd_node = \ + dagparts.DAGNode(jobs['medianPSD'], dag, + parent_nodes = median_psd_nodes, + input_files = {"": [node.output_files["output-name"] for node in median_psd_nodes]}, + output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([jobs['medianPSD'].output_path, gpsmod5]))} + ) + return median_psd_node + +def svd_layer(dag, jobs, parent_nodes, psd, bank_cache, options, seg, output_dir, template_mchirp_dict): + svd_nodes = {} + new_template_mchirp_dict = {} + svd_dtdphi_map = {} + for ifo, list_of_svd_caches in bank_cache.items(): + bin_offset = 0 + for j, svd_caches in enumerate(list_of_svd_caches): + svd_caches = map(CacheEntry, open(svd_caches)) + for i, individual_svd_cache in enumerate(ce.path for ce in svd_caches): + # First sort out the clipleft, clipright options + clipleft = [] + clipright = [] + ids = [] + mchirp_interval = (float("inf"), 0) + individual_svd_cache = map(CacheEntry, open(individual_svd_cache)) + for n, f in enumerate(ce.path for ce in individual_svd_cache): + # handle template bank clipping + clipleft.append(options.overlap[j] / 2) + clipright.append(options.overlap[j] / 2) + ids.append("%d_%d" % (i+bin_offset, n)) + if f in template_mchirp_dict: + mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1])) + svd_dtdphi_map["%04d" % (i+bin_offset)] = options.dtdphi_file[j] + + svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = jobs['svd'].output_path) + if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0): + new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval + + svdnode = dagparts.DAGNode( + jobs['svd'], + dag, + parent_nodes = parent_nodes, + opts = { + "svd-tolerance":options.tolerance, + "flow":options.flow[j], + "sample-rate":options.sample_rate, + "clipleft":clipleft, + "clipright":clipright, + "samples-min":options.samples_min[j], + "samples-max-256":options.samples_max_256, + "samples-max-64":options.samples_max_64, + "samples-max":options.samples_max, + "autocorrelation-length":options.autocorrelation_length, + "bank-id":ids, + "identity-transform":options.identity_transform, + "ortho-gate-fap":0.5 + }, + input_files = {"reference-psd":psd}, + input_cache_files = {"template-bank-cache":[ce.path for ce in individual_svd_cache]}, + input_cache_file_name = os.path.basename(svd_bank_name).replace(".xml.gz", ".cache"), + output_files = {"write-svd":svd_bank_name}, + ) + + # impose a priority to help with depth first submission + svdnode.set_priority(99) + svd_nodes.setdefault(ifo, []).append(svdnode) + bin_offset += i+1 + + # Plot template/svd bank jobs + primary_ifo = bank_cache.keys()[0] + dagparts.DAGNode( + jobs['plotBanks'], + dag, + parent_nodes = sum(svd_nodes.values(),[]), + opts = {"plot-template-bank":"", "output-dir": output_dir}, + input_files = {"template-bank-file":options.template_bank}, + ) + + return svd_nodes, new_template_mchirp_dict, svd_dtdphi_map + + +def inspiral_layer(dag, jobs, psd_nodes, svd_nodes, segsdict, options, channel_dict, template_mchirp_dict): + inspiral_nodes = {} + for ifos in segsdict: + # FIXME: handles more than 3 ifos with same cpu/memory requests + inspiral_name = 'gstlalInspiral%dIFO' % min(len(ifos), 3) + inspiral_inj_name = 'gstlalInspiralInj%dIFO' % min(len(ifos), 3) + + # setup dictionaries to hold the inspiral nodes + inspiral_nodes[(ifos, None)] = {} + ignore = {} + injection_files = [] + for injections in options.injections: + min_chirp_mass, max_chirp_mass, injections = injections.split(':') + injection_files.append(injections) + min_chirp_mass, max_chirp_mass = float(min_chirp_mass), float(max_chirp_mass) + inspiral_nodes[(ifos, sim_tag_from_inj_file(injections))] = {} + ignore[injections] = [] + for bgbin_index, bounds in sorted(template_mchirp_dict.items(), key = lambda (k,v): int(k)): + if max_chirp_mass <= bounds[0]: + ignore[injections].append(int(bgbin_index)) + # NOTE putting a break here assumes that the min chirp mass + # in a subbank increases with bin number, i.e. XXXX+1 has a + # greater minimum chirpmass than XXXX, for all XXXX. Note + # that the reverse is not true, bin XXXX+1 may have a lower + # max chirpmass than bin XXXX. + elif min_chirp_mass > bounds[1]: + ignore[injections].append(int(bgbin_index)) + + # FIXME choose better splitting? + numchunks = 50 + + # only use a channel dict with the relevant channels + this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict) + + # get the svd bank strings + svd_bank_strings_full = create_svd_bank_strings(svd_nodes, instruments = this_channel_dict.keys()) + + # get a mapping between chunk counter and bgbin for setting priorities + bgbin_chunk_map = {} + + for seg in segsdict[ifos]: + if injection_files: + output_seg_inj_path = subdir_path([jobs[inspiral_inj_name].output_path, str(int(seg[0]))[:5]]) + + if jobs[inspiral_name] is None: + # injection-only run + inspiral_nodes[(ifos, None)].setdefault(seg, [None]) + + else: + output_seg_path = subdir_path([jobs[inspiral_name].output_path, str(int(seg[0]))[:5]]) + for chunk_counter, svd_bank_strings in enumerate(dagparts.groups(svd_bank_strings_full, numchunks)): + bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)] + # setup output names + output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)] + output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] + dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % idx, seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] + + for bgbin in bgbin_indices: + bgbin_chunk_map.setdefault(bgbin, chunk_counter) + + # Calculate the appropriate ht-gate-threshold values according to the scale given + threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options) + + # non injection node + noninjnode = dagparts.DAGNode(jobs[inspiral_name], dag, + parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]), + opts = { + "psd-fft-length":options.psd_fft_length, + "ht-gate-threshold":threshold_values, + "frame-segments-name":options.frame_segments_name, + "gps-start-time":int(seg[0]), + "gps-end-time":int(seg[1]), + "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), + "tmp-space":dagparts.condor_scratch_space(), + "track-psd":"", + "control-peak-time":options.control_peak_time, + "coincidence-threshold":options.coincidence_threshold, + "singles-threshold":options.singles_threshold, + "fir-stride":options.fir_stride, + "data-source":"frames", + "local-frame-caching":"", + "min-instruments":options.min_instruments, + "reference-likelihood-file":options.reference_likelihood_file + }, + input_files = { + "time-slide-file":options.time_slide_file, + "frame-cache":options.frame_cache, + "frame-segments-file":options.frame_segments_file, + "reference-psd":psd_nodes[(ifos, seg)].output_files["write-psd"], + "blind-injections":options.blind_injections, + "veto-segments-file":options.vetoes, + }, + input_cache_files = {"svd-bank-cache":svd_bank_cache_maker(svd_bank_strings)}, + output_cache_files = { + "output-cache":output_names, + "ranking-stat-output-cache":dist_stat_names + } + ) + + # Set a post script to check for file integrity + if options.gzip_test: + noninjnode.set_post_script("gzip_test.sh") + noninjnode.add_post_script_arg(" ".join(output_names + dist_stat_names)) + + # impose a priority to help with depth first submission + noninjnode.set_priority(chunk_counter+15) + + inspiral_nodes[(ifos, None)].setdefault(seg, []).append(noninjnode) + + # process injections + for injections in injection_files: + # setup output names + sim_name = sim_tag_from_inj_file(injections) + + bgbin_svd_bank_strings = [bgbin_svdbank for i, bgbin_svdbank in enumerate(zip(sorted(template_mchirp_dict.keys()), svd_bank_strings_full)) if i not in ignore[injections]] + + for chunk_counter, bgbin_list in enumerate(dagparts.groups(bgbin_svd_bank_strings, numchunks)): + bgbin_indices, svd_bank_strings = zip(*bgbin_list) + output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices] + output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (idx, sim_name), seg, '.xml.gz', path = path) for idx, path in zip(bgbin_indices, output_paths)] + svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))] + try: + reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"] + parents = [svd_node_list[int(bgbin_index)] for svd_node_list in svd_nodes.values() for bgbin_index in bgbin_indices] + except AttributeError: ### injection-only run + reference_psd = psd_nodes[(ifos, seg)] + parents = [] + + svd_files = [CacheEntry.from_T050017(filename) for filename in svd_names] + input_cache_name = dagparts.group_T050017_filename_from_T050017_files(svd_files, '.cache').replace('SVD', 'SVD_%s' % sim_name) + + # Calculate the appropriate ht-gate-threshold values according to the scale given + threshold_values = get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options) + + # setup injection node + # FIXME: handles more than 3 ifos with same cpu/memory requests + injnode = dagparts.DAGNode(jobs[inspiral_inj_name], dag, + parent_nodes = parents, + opts = { + "psd-fft-length":options.psd_fft_length, + "ht-gate-threshold":threshold_values, + "frame-segments-name":options.frame_segments_name, + "gps-start-time":int(seg[0]), + "gps-end-time":int(seg[1]), + "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), + "tmp-space":dagparts.condor_scratch_space(), + "track-psd":"", + "control-peak-time":options.control_peak_time, + "coincidence-threshold":options.coincidence_threshold, + "singles-threshold":options.singles_threshold, + "fir-stride":options.fir_stride, + "data-source":"frames", + "local-frame-caching":"", + "min-instruments":options.min_instruments, + "reference-likelihood-file":options.reference_likelihood_file + }, + input_files = { + "time-slide-file":options.inj_time_slide_file, + "frame-cache":options.frame_cache, + "frame-segments-file":options.frame_segments_file, + "reference-psd":reference_psd, + "veto-segments-file":options.vetoes, + "injections": injections + }, + input_cache_files = {"svd-bank-cache":svd_names}, + input_cache_file_name = input_cache_name, + output_cache_files = {"output-cache":output_names} + ) + # Set a post script to check for file integrity + if options.gzip_test: + injnode.set_post_script("gzip_test.sh") + injnode.add_post_script_arg(" ".join(output_names)) + + # impose a priority to help with depth first submission + if bgbin_chunk_map: + injnode.set_priority(bgbin_chunk_map[bgbin_indices[-1]]+1) + else: + injnode.set_priority(chunk_counter+1) + + inspiral_nodes[(ifos, sim_name)].setdefault(seg, []).append(injnode) + + # Replace mchirplo:mchirphi:inj.xml with inj.xml + options.injections = [inj.split(':')[-1] for inj in options.injections] + + # NOTE: Adapt the output of the gstlal_inspiral jobs to be suitable for the remainder of this analysis + lloid_output, lloid_diststats = adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict) + + return inspiral_nodes, lloid_output, lloid_diststats + + +def expected_snr_layer(dag, jobs, ref_psd_parent_nodes, options, num_split_inj_snr_jobs): + ligolw_add_nodes = [] + for inj in options.injections: + inj_snr_nodes = [] + + inj_splitter_node = dagparts.DAGNode(jobs['injSplitter'], dag, parent_nodes=[], + opts = { + "output-path":jobs['injSplitter'].output_path, + "usertag": sim_tag_from_inj_file(inj.split(":")[-1]), + "nsplit": num_split_inj_snr_jobs + }, + input_files = {"": inj.split(":")[-1]} + ) + inj_splitter_node.set_priority(98) + + # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache + injection_files = ["%s/%s_INJ_SPLIT_%04d.xml" % (jobs['injSplitter'].output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in range(num_split_inj_snr_jobs)] + for injection_file in injection_files: + injSNRnode = dagparts.DAGNode(jobs['gstlalInjSnr'], dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node], + # FIXME somehow choose the actual flow based on mass? + # max(flow) is chosen for performance not + # correctness hopefully though it will be good + # enough + opts = {"flow":max(options.flow),"fmax":options.fmax}, + input_files = { + "injection-file": injection_file, + "reference-psd-cache": "reference_psd.cache" + } + ) + injSNRnode.set_priority(98) + inj_snr_nodes.append(injSNRnode) + + addnode = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes=inj_snr_nodes, + input_files = {"": ' '.join(injection_files)}, + output_files = {"output": inj.split(":")[-1]} + ) + + ligolw_add_nodes.append(dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [addnode], + opts = {"sql-file":options.injection_proc_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":addnode.output_files["output"]} + ) + ) + return ligolw_add_nodes + + +def summary_plot_layer(dag, jobs, farnode, options, injdbs, noninjdb, output_dir): + plotnodes = [] + + ### common plot options + common_plot_opts = { + "segments-name": options.frame_segments_name, + "tmp-space": dagparts.condor_scratch_space(), + "output-dir": output_dir, + "likelihood-file":"post_marginalized_likelihood.xml.gz", + "shrink-data-segments": 32.0, + "extend-veto-segments": 8., + } + sensitivity_opts = { + "output-dir":output_dir, + "tmp-space":dagparts.condor_scratch_space(), + "veto-segments-name":"vetoes", + "bin-by-source-type":"", + "dist-bins":200, + "data-segments-name":"datasegments" + } + + ### plot summary + opts = {"user-tag": "ALL_LLOID_COMBINED", "remove-precession": ""} + opts.update(common_plot_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSummary'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"": [noninjdb] + injdbs} + )) + + ### isolated precession plot summary + opts = {"user-tag": "PRECESSION_LLOID_COMBINED", "isolate-precession": "", "plot-group": 1} + opts.update(common_plot_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSummaryIsolatePrecession'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"":[noninjdb] + injdbs} + )) + + for injdb in injdbs: + ### individual injection plot summary + opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1], "remove-precession": "", "plot-group": 1} + opts.update(common_plot_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummary'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"":[noninjdb] + [injdb]} + )) + + ### isolated precession injection plot summary + opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "isolate-precession": "", "plot-group": 1} + opts.update(common_plot_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSnglInjSummaryIsolatePrecession'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"":[noninjdb] + [injdb]} + )) + + ### sensitivity plots + opts = {"user-tag": "ALL_LLOID_COMBINED"} + opts.update(sensitivity_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"zero-lag-database": noninjdb, "": injdbs} + )) + + for injdb in injdbs: + opts = {"user-tag": injdb.replace(".sqlite","").split("-")[1]} + opts.update(sensitivity_opts) + plotnodes.append(dagparts.DAGNode(jobs['plotSensitivity'], dag, parent_nodes=[farnode], + opts = opts, + input_files = {"zero-lag-database": noninjdb, "": injdb} + )) + + ### background plots + plotnodes.append(dagparts.DAGNode(jobs['plotBackground'], dag, parent_nodes = [farnode], + opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, + input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb} + )) + + return plotnodes + + +def clean_merger_products_layer(dag, jobs, plotnodes, dbs_to_delete, margfiles_to_delete): + """clean intermediate merger products + """ + for db in dbs_to_delete: + dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, + input_files = {"": db} + ) + + for margfile in margfiles_to_delete: + dagparts.DAGNode(jobs['rm'], dag, parent_nodes = plotnodes, + input_files = {"": margfile} + ) + return None + + +def inj_psd_layer(segsdict, options): + psd_nodes = {} + psd_cache_files = {} + for ce in map(CacheEntry, open(options.psd_cache)): + psd_cache_files.setdefault(frozenset(lsctables.instrumentsproperty.get(ce.observatory)), []).append((ce.segment, ce.path)) + for ifos in segsdict: + reference_psd_files = sorted(psd_cache_files[ifos], key = lambda (s, p): s) + ref_psd_file_num = 0 + for seg in segsdict[ifos]: + while int(reference_psd_files[ref_psd_file_num][0][0]) < int(seg[0]): + ref_psd_file_num += 1 + psd_nodes[(ifos, seg)] = reference_psd_files[ref_psd_file_num][1] + ref_psd_parent_nodes = [] + return psd_nodes, ref_psd_parent_nodes + + +def mass_model_layer(dag, jobs, parent_nodes, instruments, options, seg, psd): + """mass model node + """ + if options.mass_model_file is None: + # choose, arbitrarily, the lowest instrument in alphabetical order + model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = jobs['model'].output_path) + model_node = dagparts.DAGNode(jobs['model'], dag, + input_files = {"template-bank": options.template_bank, "reference-psd": psd}, + opts = {"model":options.mass_model}, + output_files = {"output": model_file_name}, + parent_nodes = parent_nodes + ) + return [model_node], model_file_name + else: + return [], options.mass_model_file + + +def merge_cluster_layer(dag, jobs, parent_nodes, db, db_cache, sqlfile, input_files=None): + """merge and cluster from sqlite database + """ + if input_files: + input_ = {"": input_files} + else: + input_ = {} + + # Merge database into chunks + sqlitenode = dagparts.DAGNode(jobs['toSqlite'], dag, parent_nodes = parent_nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, + input_files = input_, + input_cache_files = {"input-cache": db_cache}, + output_files = {"database":db}, + input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache') + ) + + # cluster database + return dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], + opts = {"sql-file": sqlfile, "tmp-space": dagparts.condor_scratch_space()}, + input_files = {"": db} + ) + + +def marginalize_layer(dag, jobs, svd_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set, model_node, model_file, ref_psd, svd_dtdphi_map): + instruments = "".join(sorted(instrument_set)) + margnodes = {} + + # NOTE! we rely on there being identical templates in each instrument, + # so we just take one of the values of the svd_nodes which are a dictionary + # FIXME, the svd nodes list has to be the same as the sorted keys of + # lloid_output. svd nodes should be made into a dictionary much + # earlier in the code to prevent a mishap + one_ifo_svd_nodes = dict(("%04d" % n, node) for n, node in enumerate( svd_nodes.values()[0])) + + # Here n counts the bins + # FIXME - this is broken for injection dags right now because of marg nodes + # first non-injections, which will get skipped if this is an injections-only run + for bin_key in sorted(lloid_output[None].keys()): + outputs = lloid_output[None][bin_key] + diststats = lloid_diststats[bin_key] + inputs = [o[0] for o in outputs] + parents = dagparts.flatten([o[1] for o in outputs]) + rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) + + # FIXME we keep this here in case we someday want to have a + # mass bin dependent prior, but it really doesn't matter for + # the time being. + priornode = dagparts.DAGNode(jobs['createPriorDistStats'], dag, + parent_nodes = [one_ifo_svd_nodes[bin_key]] + model_node or [], + opts = { + "instrument": instrument_set, + "background-prior": 1, + "min-instruments": options.min_instruments, + "coincidence-threshold":options.coincidence_threshold, + "df": "bandwidth" + }, + input_files = { + "svd-file": one_ifo_svd_nodes[bin_key].output_files["write-svd"], + "mass-model-file": model_file, + "dtdphi-file": svd_dtdphi_map[bin_key], + "psd-xml": ref_psd + }, + output_files = {"write-likelihood": rankfile('CREATE_PRIOR_DIST_STATS', job=jobs['createPriorDistStats'])} + ) + # Create a file that has the priors *and* all of the diststats + # for a given bin marginalized over time. This is all that will + # be needed to compute the likelihood + diststats_per_bin_node = dagparts.DAGNode(jobs['marginalize'], dag, + parent_nodes = [priornode] + parents, + opts = {"marginalize": "ranking-stat"}, + input_cache_files = {"likelihood-cache": diststats + [priornode.output_files["write-likelihood"]]}, + output_files = {"output": rankfile('MARG_DIST_STATS', job=jobs['marginalize'])}, + input_cache_file_name = rankfile('MARG_DIST_STATS') + ) + + margnodes[bin_key] = diststats_per_bin_node + + return margnodes + + +def calc_rank_pdf_layer(dag, jobs, marg_nodes, options, boundary_seg, instrument_set): + rankpdf_nodes = [] + rankpdf_zerolag_nodes = [] + instruments = "".join(sorted(instrument_set)) + + # Here n counts the bins + for bin_key in sorted(marg_nodes.keys()): + rankfile = functools.partial(get_rank_file, instruments, boundary_seg, bin_key) + + calcranknode = dagparts.DAGNode(jobs['calcRankPDFs'], dag, + parent_nodes = [marg_nodes[bin_key]], + opts = {"ranking-stat-samples":options.ranking_stat_samples}, + input_files = {"": marg_nodes[bin_key].output_files["output"]}, + output_files = {"output": rankfile('CALC_RANK_PDFS', job=jobs['calcRankPDFs'])}, + ) + + calcrankzerolagnode = dagparts.DAGNode(jobs['calcRankPDFsWithZerolag'], dag, + parent_nodes = [marg_nodes[bin_key]], + opts = {"add-zerolag-to-background": "", "ranking-stat-samples": options.ranking_stat_samples}, + input_files = {"": marg_nodes[bin_key].output_files["output"]}, + output_files = {"output": rankfile('CALC_RANK_PDFS_WZL', job=jobs['calcRankPDFsWithZerolag'])}, + ) + + rankpdf_nodes.append(calcranknode) + rankpdf_zerolag_nodes.append(calcrankzerolagnode) + + return rankpdf_nodes, rankpdf_zerolag_nodes + + +def likelihood_layer(dag, jobs, marg_nodes, lloid_output, lloid_diststats, options, boundary_seg, instrument_set): + likelihood_nodes = {} + instruments = "".join(sorted(instrument_set)) + + # non-injection jobs + for bin_key in sorted(lloid_output[None].keys()): + outputs = lloid_output[None][bin_key] + diststats = lloid_diststats[bin_key] + inputs = [o[0] for o in outputs] + + # (input files for next job, dist stat files, parents for next job) + likelihood_nodes[None, bin_key] = (inputs, marg_nodes[bin_key].output_files["output"], [marg_nodes[bin_key]]) + + # injection jobs + for inj in options.injections: + lloid_nodes = lloid_output[sim_tag_from_inj_file(inj)] + for bin_key in sorted(lloid_nodes.keys()): + outputs = lloid_nodes[bin_key] + diststats = lloid_diststats[bin_key] + if outputs is not None: + inputs = [o[0] for o in outputs] + parents = dagparts.flatten([o[1] for o in outputs]) + + parents.append(marg_nodes[bin_key]) + likelihood_url = marg_nodes[bin_key].output_files["output"] + likelihood_nodes[sim_tag_from_inj_file(inj), bin_key] = (inputs, likelihood_url, parents) + + return likelihood_nodes + + +def sql_cluster_and_merge_layer(dag, jobs, likelihood_nodes, ligolw_add_nodes, options, boundary_seg, instruments): + num_chunks = 100 + innodes = {} + + # after assigning the likelihoods cluster and merge by sub bank and whether or not it was an injection run + for (sim_tag, bin_key), (inputs, likelihood_url, parents) in sorted(likelihood_nodes.items()): + db = inputs_to_db(jobs, inputs, job_type = 'toSqliteNoCache') + xml = inputs_to_db(jobs, inputs, job_type = 'ligolwAdd').replace(".sqlite", ".xml.gz") + snr_cluster_sql_file = options.snr_cluster_sql_file if sim_tag is None else options.injection_snr_cluster_sql_file + cluster_sql_file = options.cluster_sql_file if sim_tag is None else options.injection_sql_file + + # cluster sub banks + cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = parents, + opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":inputs} + ) + + # merge sub banks + merge_node = dagparts.DAGNode(jobs['ligolwAdd'], dag, parent_nodes = [cluster_node], + input_files = {"":inputs}, + output_files = {"output":xml} + ) + + # cluster and simplify sub banks + cluster_node = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [merge_node], + opts = {"sql-file": snr_cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":xml} + ) + + # assign likelihoods + likelihood_node = dagparts.DAGNode(jobs['calcLikelihood'], dag, + parent_nodes = [cluster_node], + opts = {"tmp-space":dagparts.condor_scratch_space()}, + input_files = {"likelihood-url":likelihood_url, "": xml} + ) + + sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [likelihood_node], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":xml}, + output_files = {"database":db}, + ) + sqlitenode = dagparts.DAGNode(jobs['lalappsRunSqlite'], dag, parent_nodes = [sqlitenode], + opts = {"sql-file": cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, + input_files = {"":db} + ) + + innodes.setdefault(sim_tag_from_inj_file(sim_tag) if sim_tag else None, []).append(sqlitenode) + + # make sure outnodes has a None key, even if its value is an empty list + # FIXME injection dag is broken + innodes.setdefault(None, []) + + if options.vetoes is None: + vetoes = [] + else: + vetoes = [options.vetoes] + + chunk_nodes = [] + dbs_to_delete = [] + # Process the chirp mass bins in chunks to paralellize the merging process + for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): + try: + dbs = [node.input_files[""] for node in nodes] + parents = nodes + + except AttributeError: + # analysis started at merger step but seeded by lloid files which + # have already been merged into one file per background + # bin, thus the analysis will begin at this point + dbs = nodes + parents = [] + + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] + noninjdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) + + # Merge and cluster the final non injection database + noninjsqlitenode = merge_cluster_layer(dag, jobs, parents, noninjdb, dbs, options.cluster_sql_file) + chunk_nodes.append(noninjsqlitenode) + dbs_to_delete.append(noninjdb) + + # Merge the final non injection database + outnodes = [] + injdbs = [] + if options.non_injection_db: #### injection-only run + noninjdb = options.non_injection_db + else: + final_nodes = [] + for chunk, nodes in enumerate(dagparts.groups(innodes[None], num_chunks)): + noninjdb = dagparts.T050017_filename(instruments, 'PART_LLOID_CHUNK_%04d' % chunk, boundary_seg, '.sqlite') + + # cluster the final non injection database + noninjsqlitenode = merge_cluster_layer(dag, jobs, nodes, noninjdb, [node.input_files[""] for node in nodes], options.cluster_sql_file) + final_nodes.append(noninjsqlitenode) + + input_files = (vetoes + [options.frame_segments_file]) + input_cache_files = [node.input_files[""] for node in final_nodes] + noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') + noninjsqlitenode = merge_cluster_layer(dag, jobs, final_nodes, noninjdb, input_cache_files, options.cluster_sql_file, input_files=input_files) + + cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [noninjsqlitenode], + input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} + ) + + outnodes.append(cpnode) + + if options.injections: + iterable_injections = options.injections + else: + iterable_injections = options.injections_for_merger + + for injections in iterable_injections: + # extract only the nodes that were used for injections + chunk_nodes = [] + + for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): + try: + dbs = [injnode.input_files[""] for injnode in injnodes] + parents = injnodes + except AttributeError: + dbs = injnodes + parents = [] + + # Setup the final output names, etc. + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs] + injdb = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite', path = jobs['toSqlite'].output_path) + + # merge and cluster + clusternode = merge_cluster_layer(dag, jobs, parents, injdb, dbs, options.cluster_sql_file) + chunk_nodes.append(clusternode) + dbs_to_delete.append(injdb) + + + final_nodes = [] + for chunk, injnodes in enumerate(dagparts.groups(innodes[sim_tag_from_inj_file(injections)], num_chunks)): + # Setup the final output names, etc. + injdb = dagparts.T050017_filename(instruments, 'PART_LLOID_%s_CHUNK_%04d' % (sim_tag_from_inj_file(injections), chunk), boundary_seg, '.sqlite') + + # merge and cluster + clusternode = merge_cluster_layer(dag, jobs, injnodes, injdb, [node.input_files[""] for node in injnodes], options.cluster_sql_file) + final_nodes.append(clusternode) + + # Setup the final output names, etc. + injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') + injdbs.append(injdb) + injxml = injdb.replace('.sqlite','.xml.gz') + + xml_input = injxml + + # merge and cluster + parent_nodes = final_nodes + ligolw_add_nodes + input_files = (vetoes + [options.frame_segments_file, injections]) + input_cache_files = [node.input_files[""] for node in final_nodes] + clusternode = merge_cluster_layer(dag, jobs, parent_nodes, injdb, input_cache_files, options.cluster_sql_file, input_files=input_files) + + clusternode = dagparts.DAGNode(jobs['toXML'], dag, parent_nodes = [clusternode], + opts = {"tmp-space":dagparts.condor_scratch_space()}, + output_files = {"extract":injxml}, + input_files = {"database":injdb} + ) + + inspinjnode = dagparts.DAGNode(jobs['ligolwInspinjFind'], dag, parent_nodes = [clusternode], + opts = {"time-window":0.9}, + input_files = {"":injxml} + ) + + sqlitenode = dagparts.DAGNode(jobs['toSqliteNoCache'], dag, parent_nodes = [inspinjnode], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space()}, + output_files = {"database":injdb}, + input_files = {"":xml_input} + ) + + cpnode = dagparts.DAGNode(jobs['cp'], dag, parent_nodes = [sqlitenode], + input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} + ) + + outnodes.append(cpnode) + + return injdbs, noninjdb, outnodes, dbs_to_delete + + +def final_marginalize_layer(dag, jobs, rankpdf_nodes, rankpdf_zerolag_nodes, options): + ranknodes = [rankpdf_nodes, rankpdf_zerolag_nodes] + margjobs = [jobs['marginalize'], jobs['marginalizeWithZerolag']] + margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file] + filesuffixs = ['', '_with_zerolag'] + + margnum = 16 + all_margcache = [] + all_margnodes = [] + final_margnodes = [] + for nodes, job, margfile, filesuffix in zip(ranknodes, margjobs, margfiles, filesuffixs): + try: + margin = [node.output_files["output"] for node in nodes] + parents = nodes + except AttributeError: ### analysis started at merger step + margin = nodes + parents = [] + + margnodes = [] + margcache = [] + + # split up the marginalization into groups of 10 + # FIXME: is it actually groups of 10 or groups of 16? + for margchunk in dagparts.groups(margin, margnum): + if nodes: + marg_ce = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margchunk] + margcache.append(dagparts.group_T050017_filename_from_T050017_files(marg_ce, '.xml.gz', path = job.output_path)) + margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = parents, + opts = {"marginalize": "ranking-stat-pdf"}, + output_files = {"output": margcache[-1]}, + input_cache_files = {"likelihood-cache": margchunk}, + input_cache_file_name = os.path.basename(margcache[-1]).replace('.xml.gz','.cache') + )) + + all_margcache.append(margcache) + all_margnodes.append(margnodes) + + if not options.marginalized_likelihood_file: ### not an injection-only run + for nodes, job, margnodes, margcache, margfile, filesuffix in zip(ranknodes, margjobs, all_margnodes, all_margcache, margfiles, filesuffixs): + final_margnodes.append(dagparts.DAGNode(job, dag, parent_nodes = margnodes, + opts = {"marginalize": "ranking-stat-pdf"}, + output_files = {"output": "marginalized_likelihood%s.xml.gz"%filesuffix}, + input_cache_files = {"likelihood-cache": margcache}, + input_cache_file_name = "marginalized_likelihood%s.cache"%filesuffix + )) + + return final_margnodes, dagparts.flatten(all_margcache) + + +def compute_far_layer(dag, jobs, margnodes, injdbs, noninjdb, final_sqlite_nodes, options): + """compute FAPs and FARs + """ + margfiles = [options.marginalized_likelihood_file, options.marginalized_likelihood_file] + filesuffixs = ['', '_with_zerolag'] + + for margnode, margfile, filesuffix in zip(margnodes, margfiles, filesuffixs): + if options.marginalized_likelihood_file: ### injection-only run + parents = final_sqlite_nodes + marginalized_likelihood_file = margfile + + else: + parents = [margnode] + final_sqlite_nodes + marginalized_likelihood_file = margnode.output_files["output"] + + farnode = dagparts.DAGNode(jobs['ComputeFarFromSnrChisqHistograms'], dag, parent_nodes = parents, + opts = {"tmp-space":dagparts.condor_scratch_space()}, + input_files = { + "background-bins-file": marginalized_likelihood_file, + "injection-db": [injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs] if 'zerolag' in filesuffix else injdbs, + "non-injection-db": noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') if 'zerolag' in filesuffix else noninjdb + } + ) + + if 'zerolag' not in filesuffix: + outnode = farnode + + return outnode + + +def horizon_dist_layer(dag, jobs, psd_nodes, options, boundary_seg, output_dir, instruments): + """calculate horizon distance + """ + dagparts.DAGNode(jobs['horizon'], dag, + parent_nodes = psd_nodes.values(), + input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]}, + output_files = {"":dagparts.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)} + ) + + +def summary_page_layer(dag, jobs, plotnodes, options, boundary_seg, injdbs, output_dir): + """create a summary page + """ + output_user_tags = ["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"] + output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs]) + output_user_tags.extend([injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs]) + + dagparts.DAGNode(jobs['summaryPage'], dag, parent_nodes = plotnodes, + opts = { + "title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), + "webserver-dir":options.web_dir, + "glob-path":output_dir, + "output-user-tag":output_user_tags + } + ) # @@ -68,6 +1018,47 @@ def webserver_url(): raise NotImplementedError("I don't know where the webserver is for this environment") +# +# DAG utilities +# + + +def get_threshold_values(template_mchirp_dict, bgbin_indices, svd_bank_strings, options): + """Calculate the appropriate ht-gate-threshold values according to the scale given + """ + if options.ht_gate_threshold_linear is not None: + # A scale is given + mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")] + # use max mchirp in a given svd bank to decide gate threshold + bank_mchirps = [template_mchirp_dict[bgbin_index][1] for bgbin_index in bgbin_indices] + gate_mchirp_ratio = (ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min) + return [gate_mchirp_ratio*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps] + elif options.ht_gate_threshold is not None: + return [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given + else: + return None + + +def inputs_to_db(jobs, inputs, job_type = 'toSqlite'): + dbfiles = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs] + db = dagparts.group_T050017_filename_from_T050017_files(dbfiles, '.sqlite') + return os.path.join(subdir_path([jobs[job_type].output_path, CacheEntry.from_T050017(db).description[:4]]), db) + + +def cache_to_db(cache, jobs): + hi_index = cache[-1].description.split('_')[0] + db = os.path.join(jobs['toSqlite'].output_path, os.path.basename(cache[-1].path)) + db.replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) + return db + + +def get_rank_file(instruments, boundary_seg, n, basename, job=None): + if job: + return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.xml.gz', path = job.output_path) + else: + return dagparts.T050017_filename(instruments, '_'.join([n, basename]), boundary_seg, '.cache') + + # # Utility functions # @@ -176,6 +1167,156 @@ def get_svd_bank_params(svd_bank_cache, online = False): return template_mchirp_dict +def sim_tag_from_inj_file(injections): + if injections is None: + return None + return injections.replace('.xml', '').replace('.gz', '').replace('-','_') + + +def get_bank_params(options, verbose = False): + bank_cache = {} + for bank_cache_str in options.bank_cache: + for c in bank_cache_str.split(','): + ifo = c.split("=")[0] + cache = c.replace(ifo+"=","") + bank_cache.setdefault(ifo, []).append(cache) + + max_time = 0 + template_mchirp_dict = {} + for n, cache in enumerate(bank_cache.values()[0]): + for ce in map(CacheEntry, open(cache)): + for ce in map(CacheEntry, open(ce.path)): + xmldoc = ligolw_utils.load_filename(ce.path, verbose = verbose, contenthandler = LIGOLWContentHandler) + snglinspiraltable = lsctables.SnglInspiralTable.get_table(xmldoc) + max_time = max(max_time, max(snglinspiraltable.getColumnByName('template_duration'))) + idx = options.overlap[n]/2 + template_mchirp_dict[ce.path] = [min(snglinspiraltable.getColumnByName('mchirp')[idx:-idx]), max(snglinspiraltable.getColumnByName('mchirp')[idx:-idx])] + xmldoc.unlink() + + return template_mchirp_dict, bank_cache, max_time + + +def subdir_path(dirlist): + output_path = '/'.join(dirlist) + try: + os.mkdir(output_path) + except: + pass + return output_path + + +def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_template_length, min_instruments = 2): + """get a dictionary of all the disjoint 2+ detector combination segments + """ + segsdict = segments.segmentlistdict() + # 512 seconds for the whitener to settle + the maximum template_length FIXME don't hard code + start_pad = 512 + max_template_length + # Chosen so that the overlap is only a ~5% hit in run time for long segments... + segment_length = int(5 * start_pad) + for n in range(min_instruments, 1 + len(analyzable_instruments_set)): + for ifo_combos in itertools.combinations(list(analyzable_instruments_set), n): + segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos)) + segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg]) + segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad) + segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad) + if not segsdict[frozenset(ifo_combos)]: + del segsdict[frozenset(ifo_combos)] + return segsdict + + +def create_svd_bank_strings(svd_nodes, instruments = None): + # FIXME assume that the number of svd nodes is the same per ifo, a good assumption though + outstrings = [] + for i in range(len(svd_nodes.values()[0])): + svd_bank_string = "" + for ifo in svd_nodes: + if instruments is not None and ifo not in instruments: + continue + try: + svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i].output_files["write-svd"]) + except AttributeError: + svd_bank_string += "%s:%s," % (ifo, svd_nodes[ifo][i]) + svd_bank_string = svd_bank_string.strip(",") + outstrings.append(svd_bank_string) + return outstrings + + +def svd_bank_cache_maker(svd_bank_strings, injection = False): + if injection: + dir_name = "gstlal_inspiral_inj" + else: + dir_name = "gstlal_inspiral" + svd_cache_entries = [] + parsed_svd_bank_strings = [inspiral.parse_svdbank_string(single_svd_bank_string) for single_svd_bank_string in svd_bank_strings] + for svd_bank_parsed_dict in parsed_svd_bank_strings: + for filename in svd_bank_parsed_dict.itervalues(): + svd_cache_entries.append(CacheEntry.from_T050017(filename)) + + return [svd_cache_entry.url for svd_cache_entry in svd_cache_entries] + + +def adapt_gstlal_inspiral_output(inspiral_nodes, options, segsdict): + # first get the previous output in a usable form + lloid_output = {} + for inj in options.injections + [None]: + lloid_output[sim_tag_from_inj_file(inj)] = {} + lloid_diststats = {} + if options.dist_stats_cache: + for ce in map(CacheEntry, open(options.dist_stats_cache)): + lloid_diststats[ce.description.split("_")[0]] = [ce.path] + for ifos in segsdict: + for seg in segsdict[ifos]: + # iterate over the mass space chunks for each segment + for node in inspiral_nodes[(ifos, None)][seg]: + if node is None: + break + len_out_files = len(node.output_files["output-cache"]) + for f in node.output_files["output-cache"]: + # Store the output files and the node for use as a parent dependency + lloid_output[None].setdefault(CacheEntry.from_T050017(f).description.split("_")[0], []).append((f, [node])) + for f in node.output_files["ranking-stat-output-cache"]: + lloid_diststats.setdefault(CacheEntry.from_T050017(f).description.split("_")[0] ,[]).append(f) + for inj in options.injections: + for injnode in inspiral_nodes[(ifos, sim_tag_from_inj_file(inj))][seg]: + if injnode is None: + continue + for f in injnode.output_files["output-cache"]: + # Store the output files and the node and injnode for use as a parent dependencies + bgbin_index = CacheEntry.from_T050017(f).description.split("_")[0] + try: + lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, lloid_output[None][bgbin_index][-1][1]+[injnode])) + except KeyError: + lloid_output[sim_tag_from_inj_file(inj)].setdefault(bgbin_index, []).append((f, [injnode])) + + return lloid_output, lloid_diststats + + +def set_up_scripts(options): + # Make an xml integrity checker + if options.gzip_test: + with open("gzip_test.sh", "w") as f: + f.write("#!/bin/bash\nsleep 60\ngzip --test $@") + os.chmod("gzip_test.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR) + + # A pre script to backup data before feeding to lossy programs + # (e.g. clustering routines) + with open("store_raw.sh", "w") as f: + f.write("""#!/bin/bash + for f in $@;do mkdir -p $(dirname $f)/raw;cp $f $(dirname $f)/raw/$(basename $f);done""") + os.chmod("store_raw.sh", stat.S_IRUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH | stat.S_IWUSR) + + +def load_reference_psd(options): + ref_psd = lal.series.read_psd_xmldoc(ligolw_utils.load_filename(options.reference_psd, verbose = options.verbose, contenthandler = lal.series.PSDContentHandler)) + + # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache + with open('reference_psd.cache', "w") as output_cache_file: + output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(options.reference_psd))) + + return ref_psd + + + if __name__ == "__main__": import doctest doctest.testmod()