diff --git a/gstlal-burst/bin/gstlal_feature_extractor_pipe b/gstlal-burst/bin/gstlal_feature_extractor_pipe index 68037ebb2b2bdb17cb3fe8b00f27de2a2f350114..be9e6b15d44b5314be4e5d68761e098fd511d46b 100755 --- a/gstlal-burst/bin/gstlal_feature_extractor_pipe +++ b/gstlal-burst/bin/gstlal_feature_extractor_pipe @@ -35,8 +35,7 @@ import lal from ligo import segments from gstlal import aggregator -from gstlal import inspiral_pipe -from gstlal import dagparts as gstlaldagparts +from gstlal import dagparts from gstlal.fxtools import feature_extractor from gstlal.fxtools import multichannel_datasource @@ -107,7 +106,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdic feature_end_time = min(int(seg[1]), options.gps_end_time) feature_extractor_nodes[(ii, seg)] = \ - inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = dep_nodes, + dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = dep_nodes, opts = {"gps-start-time":gps_start_time, "gps-end-time":feature_end_time, "feature-start-time":feature_start_time, @@ -207,11 +206,11 @@ aggregator.makedir("logs") # set up dag and job classes # -dag = inspiral_pipe.DAG("feature_extractor_pipe") +dag = dagparts.DAG("feature_extractor_pipe") condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} -condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options) -feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands) +condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options) +feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands) segsdict = analysis_segments(ifo, data_source_info.frame_segments, data_source_info.seg, options.segment_length, max_template_length=max_template_length) # diff --git a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe index 5c11c28453b8546940e6f0bfe96d0fc0397575db..0f40c675b04ae40035c452f25b9b13f2aec65a12 100755 --- a/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe +++ b/gstlal-burst/bin/gstlal_ll_feature_extractor_pipe @@ -33,7 +33,6 @@ import os from gstlal import aggregator from gstlal import dagparts -from gstlal import inspiral_pipe from gstlal.fxtools import feature_extractor from gstlal.fxtools import multichannel_datasource @@ -130,7 +129,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op subset_options.update(command_line_options) feature_extractor_nodes[ii] = \ - inspiral_pipe.generic_node(feature_extractor_job, dag, parent_nodes = parent_nodes, + dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = parent_nodes, opts = subset_options, output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")} ) @@ -153,14 +152,14 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op # ============================= -class zookeeper_job(inspiral_pipe.generic_job): +class zookeeper_job(dagparts.DAGJob): """ A zookeeper job """ def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(dagparts.log_path(), "zookeeper"), port = 2271, maxclients = 0, condor_commands = {}): """ """ - inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(datadir) @@ -179,14 +178,14 @@ maxClientCnxns=%d f.close() -class kafka_job(inspiral_pipe.generic_job): +class kafka_job(dagparts.DAGJob): """ A kafka job """ def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(dagparts.log_path(), "kafka"), host = "10.14.0.112:9182", zookeeperaddr = "localhost:2271", condor_commands = {}): """ """ - inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(logdir) @@ -290,15 +289,15 @@ aggregator.makedir("logs") # set up dag and job classes # -dag = inspiral_pipe.DAG("feature_extractor_pipe") +dag = dagparts.DAG("feature_extractor_pipe") # feature extractor job if options.condor_universe == 'local': condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} else: condor_options = {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} -condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, condor_options) -feature_extractor_job = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe) +condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, condor_options) +feature_extractor_job = dagparts.DAGJob("gstlal_feature_extractor", condor_commands = condor_commands, universe = options.condor_universe) feature_extractor_nodes, num_channels = feature_extractor_node_gen(feature_extractor_job, dag, [], ifo, options, data_source_info) # auxiliary jobs @@ -307,15 +306,15 @@ if options.save_format == 'kafka': auxiliary_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} else: auxiliary_condor_options = {"request_memory":options.auxiliary_request_memory, "request_cpus":options.auxiliary_request_cpu, "want_graceful_removal":"True", "kill_sig":"15"} - auxiliary_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options) - synchronizer_job = inspiral_pipe.generic_job("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) - hdf5_sink_job = inspiral_pipe.generic_job("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) - listener_job = inspiral_pipe.generic_job("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) + auxiliary_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, auxiliary_condor_options) + synchronizer_job = dagparts.DAGJob("gstlal_feature_synchronizer", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) + hdf5_sink_job = dagparts.DAGJob("gstlal_feature_hdf5_sink", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) + listener_job = dagparts.DAGJob("gstlal_feature_listener", condor_commands = auxiliary_condor_commands, universe = options.condor_universe) if not options.disable_kafka_jobs: # kafka/zookeeper jobs local_condor_options = {"want_graceful_removal":"True", "kill_sig":"15"} - local_condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, local_condor_options) + local_condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, local_condor_options) zoo_job = zookeeper_job(condor_commands = local_condor_commands) kafka_job = kafka_job(condor_commands = local_condor_commands, host = options.kafka_server) @@ -380,13 +379,13 @@ if options.save_format == 'kafka': if options.save_format == 'kafka': synchronizer_options.update({"num-topics": len(feature_extractor_nodes)}) - synchronizer_node = inspiral_pipe.generic_node(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")}) - hdf5_sink_node = inspiral_pipe.generic_node(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")}) - listener_node = inspiral_pipe.generic_node(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")}) + synchronizer_node = dagparts.DAGNode(synchronizer_job, dag, [], opts = synchronizer_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_synchronizer")}) + hdf5_sink_node = dagparts.DAGNode(hdf5_sink_job, dag, [], opts = hdf5_sink_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_hdf5_sink")}) + listener_node = dagparts.DAGNode(listener_job, dag, [], opts = listener_options, output_files = {"rootdir": os.path.join(options.out_path, "gstlal_feature_listener")}) if not options.disable_kafka_jobs: - zoo_node = inspiral_pipe.generic_node(zoo_job, dag, [], opts = {"":"zookeeper.properties"}) - kafka_node = inspiral_pipe.generic_node(kafka_job, dag, [], opts = {"":"kafka.properties"}) + zoo_node = dagparts.DAGNode(zoo_job, dag, [], opts = {"":"zookeeper.properties"}) + kafka_node = dagparts.DAGNode(kafka_job, dag, [], opts = {"":"kafka.properties"}) # # write out dag and sub files diff --git a/gstlal-inspiral/bin/gstlal_bank_splitter b/gstlal-inspiral/bin/gstlal_bank_splitter index ff489177c48ac77b89f2bd5f8b62d313c06832b8..14be959e65456ff73f7278726703de6929df4899 100755 --- a/gstlal-inspiral/bin/gstlal_bank_splitter +++ b/gstlal-inspiral/bin/gstlal_bank_splitter @@ -29,6 +29,7 @@ from glue.ligolw import utils as ligolw_utils from glue.ligolw.utils import process as ligolw_process from gstlal import templates from gstlal import inspiral_pipe +from gstlal import dagparts from gstlal import chirptime import lal from lal.utils import CacheEntry @@ -216,7 +217,7 @@ with open(options.output_cache, "w") as output_cache_file: row.mtotal = row.mass1 + row.mass2 sngl_inspiral_table[:] = rows - output = inspiral_pipe.T050017_filename(options.instrument, "%04d_GSTLAL_SPLIT_BANK" % bank_count, (0, 0), ".xml.gz", path = options.output_path) + output = dagparts.T050017_filename(options.instrument, "%04d_GSTLAL_SPLIT_BANK" % bank_count, (0, 0), ".xml.gz", path = options.output_path) if not options.write_svd_caches: output_cache_file.write("%s\n" % CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(output))) else: @@ -232,7 +233,7 @@ with open(options.output_cache, "w") as output_cache_file: pass for svd_cache_group in inspiral_pipe.group(svd_caches, options.num_banks): - output = inspiral_pipe.T050017_filename(options.instrument, "%04d_%04d_GSTLAL_SPLIT_BANK" % (svd_cache_group[0][0], svd_cache_group[-1][0]), (0, 0), ".cache", path = cache_path) + output = dagparts.T050017_filename(options.instrument, "%04d_%04d_GSTLAL_SPLIT_BANK" % (svd_cache_group[0][0], svd_cache_group[-1][0]), (0, 0), ".cache", path = cache_path) with open(output, 'w') as svd_cache: for bank_count, split_bank_cache_entry in svd_cache_group: svd_cache.write(split_bank_cache_entry) diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral index cbd6a36d69f5e13914e2cc50a619950a15e02850..0db35cf91b84a8c6fce96e774b5f9e535f1cdc8c 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral +++ b/gstlal-inspiral/bin/gstlal_inspiral @@ -173,7 +173,6 @@ from gstlal import far from gstlal import httpinterface from gstlal import hoftcache from gstlal import inspiral -from gstlal import inspiral_pipe from gstlal import lloidhandler from gstlal import lloidparts from gstlal import pipeparts diff --git a/gstlal-inspiral/bin/gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs_dag b/gstlal-inspiral/bin/gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs_dag index c909230452fb442998a1e3b5ff9621db5d87fac9..cb179cf549db588ed633350caa9016e5d10fd673 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs_dag +++ b/gstlal-inspiral/bin/gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs_dag @@ -18,27 +18,26 @@ import os from glue import pipeline -from gstlal import inspiral_pipe -from gstlal import dagparts as gstlaldagparts +from gstlal import dagparts try: os.mkdir("logs") except: pass -dag = inspiral_pipe.DAG("dt_dphi") +dag = dagparts.DAG("dt_dphi") -margJob = inspiral_pipe.generic_job("gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs", condor_commands = {"request_memory":"8GB", "want_graceful_removal":"True", "kill_sig":"15", "accounting_group":"ligo.prod.o2.cbc.uber.gstlaloffline"}) -addJob = inspiral_pipe.generic_job("gstlal_inspiral_add_dt_dphi_snr_ratio_pdfs", condor_commands = {"request_memory":"4GB", "want_graceful_removal":"True", "kill_sig":"15", "accounting_group":"ligo.prod.o2.cbc.uber.gstlaloffline"}) +margJob = dagparts.DAGJob("gstlal_inspiral_create_dt_dphi_snr_ratio_pdfs", condor_commands = {"request_memory":"8GB", "want_graceful_removal":"True", "kill_sig":"15", "accounting_group":"ligo.prod.o2.cbc.uber.gstlaloffline"}) +addJob = dagparts.DAGJob("gstlal_inspiral_add_dt_dphi_snr_ratio_pdfs", condor_commands = {"request_memory":"4GB", "want_graceful_removal":"True", "kill_sig":"15", "accounting_group":"ligo.prod.o2.cbc.uber.gstlaloffline"}) num = 1000 margnodes = [] # FIXME dont hardcode 3345408, it comes from number of tiles in TimePhaseSNR for start in range(0, 3345408, num): stop = start + num - margnodes.append(inspiral_pipe.generic_node(margJob, dag, parent_nodes = [], opts = {"start":str(start), "stop":str(stop)}, output_files = {"output":"%s/inspiral_dtdphi_pdf_%d_%d.h5" % (margJob.output_path, start, stop)})) + margnodes.append(dagparts.DAGNode(margJob, dag, parent_nodes = [], opts = {"start":str(start), "stop":str(stop)}, output_files = {"output":"%s/inspiral_dtdphi_pdf_%d_%d.h5" % (margJob.output_path, start, stop)})) -addnode = inspiral_pipe.generic_node(addJob, dag, parent_nodes = margnodes, input_files = {"": [n.output_files["output"] for n in margnodes]}) +addnode = dagparts.DAGNode(addJob, dag, parent_nodes = margnodes, input_files = {"": [n.output_files["output"] for n in margnodes]}) dag.write_sub_files() dag.write_dag() diff --git a/gstlal-inspiral/bin/gstlal_inspiral_pipe b/gstlal-inspiral/bin/gstlal_inspiral_pipe index a53afc75c58d6dba888bec9c9644a37917fad843..174bebfdbd0216bd69c67f853ee80495364c8573 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_inspiral_pipe @@ -58,7 +58,7 @@ from glue.ligolw import lsctables import glue.ligolw.utils as ligolw_utils import glue.ligolw.utils.segments as ligolw_segments from gstlal import inspiral, inspiral_pipe -from gstlal import dagparts as gstlaldagparts +from gstlal import dagparts from gstlal import datasource class LIGOLWContentHandler(ligolw.LIGOLWContentHandler): @@ -125,7 +125,7 @@ def analysis_segments(analyzable_instruments_set, allsegs, boundary_seg, max_tem segsdict[frozenset(ifo_combos)] = allsegs.intersection(ifo_combos) - allsegs.union(analyzable_instruments_set - set(ifo_combos)) segsdict[frozenset(ifo_combos)] &= segments.segmentlist([boundary_seg]) segsdict[frozenset(ifo_combos)] = segsdict[frozenset(ifo_combos)].protract(start_pad) - segsdict[frozenset(ifo_combos)] = gstlaldagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad) + segsdict[frozenset(ifo_combos)] = dagparts.breakupsegs(segsdict[frozenset(ifo_combos)], segment_length, start_pad) if not segsdict[frozenset(ifo_combos)]: del segsdict[frozenset(ifo_combos)] return segsdict @@ -136,7 +136,7 @@ def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options): this_channel_dict = dict((k, channel_dict[k]) for k in ifos if k in channel_dict) for seg in segsdict[ifos]: psd_nodes[(ifos, seg)] = \ - inspiral_pipe.generic_node(refPSDJob, dag, parent_nodes = parent_nodes, + dagparts.DAGNode(refPSDJob, dag, parent_nodes = parent_nodes, opts = {"gps-start-time":int(seg[0]), "gps-end-time":int(seg[1]), "data-source":"frames", @@ -145,7 +145,7 @@ def psd_node_gen(refPSDJob, dag, parent_nodes, segsdict, channel_dict, options): "frame-segments-name": options.frame_segments_name}, input_files = { "frame-cache":options.frame_cache, "frame-segments-file":options.frame_segments_file}, - output_files = {"write-psd":inspiral_pipe.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = subdir_path([refPSDJob.output_path, str(int(seg[0]))[:5]]))} + output_files = {"write-psd":dagparts.T050017_filename(ifos, "REFERENCE_PSD", seg, '.xml.gz', path = subdir_path([refPSDJob.output_path, str(int(seg[0]))[:5]]))} ) return psd_nodes @@ -167,8 +167,8 @@ def inj_psd_node_gen(segsdict, options): def model_node_gen(modelJob, dag, parent_nodes, instruments, options, seg, template_bank, psd): if options.mass_model_file is None: # choose, arbitrarily, the lowest instrument in alphabetical order - model_file_name = inspiral_pipe.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = modelJob.output_path) - model_node = inspiral_pipe.generic_node(modelJob, dag, + model_file_name = dagparts.T050017_filename(instruments, 'ALL_MASS_MODEL', seg, '.h5', path = modelJob.output_path) + model_node = dagparts.DAGNode(modelJob, dag, input_files = {"template-bank": template_bank, "reference-psd": psd}, opts = {"model":options.mass_model}, output_files = {"output": model_file_name}, @@ -200,11 +200,11 @@ def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_cache, options, seg, templ if f in template_mchirp_dict: mchirp_interval = (min(mchirp_interval[0], template_mchirp_dict[f][0]), max(mchirp_interval[1], template_mchirp_dict[f][1])) - svd_bank_name = inspiral_pipe.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = svdJob.output_path) + svd_bank_name = dagparts.T050017_filename(ifo, '%04d_SVD' % (i+bin_offset,), seg, '.xml.gz', path = svdJob.output_path) if '%04d' % (i+bin_offset,) not in new_template_mchirp_dict and mchirp_interval != (float("inf"), 0): new_template_mchirp_dict['%04d' % (i+bin_offset,)] = mchirp_interval - svdnode = inspiral_pipe.generic_node(svdJob, dag, + svdnode = dagparts.DAGNode(svdJob, dag, parent_nodes = parent_nodes, opts = {"svd-tolerance":options.tolerance, "flow":options.flow, @@ -233,7 +233,7 @@ def svd_node_gen(svdJob, dag, parent_nodes, psd, bank_cache, options, seg, templ # primary_ifo = bank_cache.keys()[0] - inspiral_pipe.generic_node(plotBanksJob, dag, + dagparts.DAGNode(plotBanksJob, dag, parent_nodes = sum(svd_nodes.values(),[]), opts = {"plot-template-bank":"", "output-dir": output_dir}, @@ -323,8 +323,8 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s bgbin_indices = ['%04d' % (i + numchunks * chunk_counter,) for i,s in enumerate(svd_bank_strings)] # setup output names output_paths = [subdir_path([output_seg_path, bgbin_indices[i]]) for i, s in enumerate(svd_bank_strings)] - output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i, s in enumerate(svd_bank_strings)] - dist_stat_names = [inspiral_pipe.T050017_filename(ifos, '%s_DIST_STATS' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i,s in enumerate(svd_bank_strings)] + output_names = [dagparts.T050017_filename(ifos, '%s_LLOID' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i, s in enumerate(svd_bank_strings)] + dist_stat_names = [dagparts.T050017_filename(ifos, '%s_DIST_STATS' % (bgbin_indices[i],), seg, '.xml.gz', path = output_paths[i]) for i,s in enumerate(svd_bank_strings)] for bgbin in bgbin_indices: bgbin_chunk_map.setdefault(bgbin, chunk_counter) @@ -342,7 +342,7 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given # non injection node - noninjnode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, + noninjnode = dagparts.DAGNode(gstlalInspiralJob, dag, parent_nodes = sum((svd_node_list[numchunks*chunk_counter:numchunks*(chunk_counter+1)] for svd_node_list in svd_nodes.values()),[]), opts = {"psd-fft-length":options.psd_fft_length, "ht-gate-threshold":threshold_values, @@ -350,7 +350,7 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s "gps-start-time":int(seg[0]), "gps-end-time":int(seg[1]), "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), - "tmp-space":inspiral_pipe.condor_scratch_space(), + "tmp-space":dagparts.condor_scratch_space(), "track-psd":"", "control-peak-time":options.control_peak_time, "coincidence-threshold":options.coincidence_threshold, @@ -392,7 +392,7 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s for chunk_counter, bgbin_list in enumerate(chunks(bgbin_svd_bank_strings, numchunks)): bgbin_indices, svd_bank_strings = zip(*bgbin_list) output_paths = [subdir_path([output_seg_inj_path, bgbin_index]) for bgbin_index in bgbin_indices] - output_names = [inspiral_pipe.T050017_filename(ifos, '%s_LLOID_%s' % (bgbin_index, sim_name), seg, '.xml.gz', path = output_paths[i]) for i, bgbin_index in enumerate(bgbin_indices)] + output_names = [dagparts.T050017_filename(ifos, '%s_LLOID_%s' % (bgbin_index, sim_name), seg, '.xml.gz', path = output_paths[i]) for i, bgbin_index in enumerate(bgbin_indices)] svd_names = [s for i, s in enumerate(svd_bank_cache_maker(svd_bank_strings, injection = True))] try: reference_psd = psd_nodes[(ifos, seg)].output_files["write-psd"] @@ -415,14 +415,14 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s threshold_values = [options.ht_gate_threshold]*len(svd_bank_strings) # Use the ht-gate-threshold value given # setup injection node - injnode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, parent_nodes = parents, + injnode = dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = parents, opts = {"psd-fft-length":options.psd_fft_length, "ht-gate-threshold":threshold_values, "frame-segments-name":options.frame_segments_name, "gps-start-time":int(seg[0]), "gps-end-time":int(seg[1]), "channel-name":datasource.pipeline_channel_list_from_channel_dict(this_channel_dict), - "tmp-space":inspiral_pipe.condor_scratch_space(), + "tmp-space":dagparts.condor_scratch_space(), "track-psd":"", "control-peak-time":options.control_peak_time, "coincidence-threshold":options.coincidence_threshold, @@ -441,7 +441,7 @@ def inspiral_node_gen(gstlalInspiralJob, gstlalInspiralInjJob, dag, svd_nodes, s "injections": injections }, input_cache_files = {"svd-bank-cache":svd_names}, - input_cache_file_name = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017(filename) for filename in svd_names], '.cache').replace('SVD', 'SVD_%s' % sim_name), + input_cache_file_name = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017(filename) for filename in svd_names], '.cache').replace('SVD', 'SVD_%s' % sim_name), output_cache_files = { "output-cache":output_names } @@ -517,35 +517,35 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi # FIXME we keep this here in case we someday want to have a # mass bin dependent prior, but it really doesn't matter for # the time being. - priornode = inspiral_pipe.generic_node(createPriorDistStatsJob, dag, + priornode = dagparts.DAGNode(createPriorDistStatsJob, dag, parent_nodes = [one_ifo_svd_nodes[n]] + mass_model_add_node, opts = {"instrument":instrument_set, "background-prior":1, "min-instruments":options.min_instruments}, input_files = {"svd-file":one_ifo_svd_nodes[n].output_files["write-svd"], "mass-model-file":mass_model_file}, - output_files = {"write-likelihood":inspiral_pipe.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = createPriorDistStatsJob.output_path)} + output_files = {"write-likelihood":dagparts.T050017_filename(instruments, '%04d_CREATE_PRIOR_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = createPriorDistStatsJob.output_path)} ) # Create a file that has the priors *and* all of the diststats # for a given bin marginalized over time. This is all that will # be needed to compute the likelihood - diststats_per_bin_node = inspiral_pipe.generic_node(marginalizeJob, dag, + diststats_per_bin_node = dagparts.DAGNode(marginalizeJob, dag, parent_nodes = [priornode] + parents, opts = {"marginalize":"ranking-stat"}, input_cache_files = {"likelihood-cache":diststats + [priornode.output_files["write-likelihood"]]}, - output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = marginalizeJob.output_path)}, - input_cache_file_name = inspiral_pipe.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache') + output_files = {"output":dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.xml.gz', path = marginalizeJob.output_path)}, + input_cache_file_name = dagparts.T050017_filename(instruments, '%04d_MARG_DIST_STATS' % (n,), boundary_seg, '.cache') ) - calcranknode = inspiral_pipe.generic_node(calcRankPDFsJob, dag, + calcranknode = dagparts.DAGNode(calcRankPDFsJob, dag, parent_nodes = [diststats_per_bin_node], opts = {"ranking-stat-samples":options.ranking_stat_samples}, input_files = {"":diststats_per_bin_node.output_files["output"]}, - output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsJob.output_path)} + output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsJob.output_path)} ) - calcrankzerolagnode = inspiral_pipe.generic_node(calcRankPDFsWithZerolagJob, dag, + calcrankzerolagnode = dagparts.DAGNode(calcRankPDFsWithZerolagJob, dag, parent_nodes = [diststats_per_bin_node], opts = {"add-zerolag-to-background":"","ranking-stat-samples":options.ranking_stat_samples}, input_files = {"":diststats_per_bin_node.output_files["output"]}, - output_files = {"output":inspiral_pipe.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsWithZerolagJob.output_path)} + output_files = {"output":dagparts.T050017_filename(instruments, '%04d_CALC_RANK_PDFS_WZL' % (n,), boundary_seg, '.xml.gz', path = calcRankPDFsWithZerolagJob.output_path)} ) margnodes['%04d' %(n,)] = diststats_per_bin_node @@ -554,9 +554,9 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi # Break up the likelihood jobs into chunks to process fewer files, e.g, 16 likelihood_nodes.setdefault(None,[]).append( - [inspiral_pipe.generic_node(calcLikelihoodJob, dag, + [dagparts.DAGNode(calcLikelihoodJob, dag, parent_nodes = [diststats_per_bin_node], - opts = {"tmp-space":inspiral_pipe.condor_scratch_space()}, + opts = {"tmp-space":dagparts.condor_scratch_space()}, input_files = {"likelihood-url":diststats_per_bin_node.output_files["output"]}, input_cache_files = {"input-cache":chunked_inputs} ) for chunked_inputs in chunks(inputs, 16)] @@ -577,9 +577,9 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi likelihood_url = diststats[0] # Break up the likelihood jobs into chunks to process fewer files, e.g., 16 likelihood_nodes.setdefault(sim_tag_from_inj_file(inj),[]).append( - [inspiral_pipe.generic_node(calcLikelihoodJobInj, dag, + [dagparts.DAGNode(calcLikelihoodJobInj, dag, parent_nodes = parents, - opts = {"tmp-space":inspiral_pipe.condor_scratch_space()}, + opts = {"tmp-space":dagparts.condor_scratch_space()}, input_files = {"likelihood-url":likelihood_url}, input_cache_files = {"input-cache":chunked_inputs} ) for chunked_inputs in chunks(inputs, 16)] @@ -597,8 +597,8 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi if inj is None: # files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run for n in range(0, len(inputs), files_to_group): - merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes, - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + merge_nodes.append(dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = nodes, + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":inputs[n:n+files_to_group]} ) ) @@ -608,24 +608,24 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi # Merging all the dbs from the same sub bank for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]): - db = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') + db = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') db = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(db).description[:4]]), db) - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = merge_nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":inputs}, output_files = {"database":db}, input_cache_file_name = os.path.basename(db).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":db} ) outnodes.setdefault(None, []).append(sqlitenode) else: # files_to_group at a time irrespective of the sub bank they came from so the jobs take a bit longer to run for n in range(0, len(inputs), files_to_group): - merge_nodes.append(inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = nodes, - opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + merge_nodes.append(dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = nodes, + opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":inputs[n:n+files_to_group]} ) ) @@ -635,16 +635,16 @@ def rank_and_merge(dag, createPriorDistStatsJob, calcRankPDFsJob, calcRankPDFsWi # Merging all the dbs from the same sub bank and injection run for subbank, inputs in enumerate([node.input_files["input-cache"] for node in nodes]): - injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') + injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in inputs], '.sqlite') injdb = os.path.join(subdir_path([toSqliteJob.output_path, CacheEntry.from_T050017(injdb).description[:4]]), injdb) - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = merge_nodes, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = merge_nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":inputs}, output_files = {"database":injdb}, input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":injdb} ) outnodes.setdefault(sim_tag_from_inj_file(inj), []).append(sqlitenode) @@ -673,17 +673,17 @@ def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options): else: for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) + noninjdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) # merge all of the dbs from the same subbank - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [], - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":dbs}, output_files = {"database":noninjdb}, input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":noninjdb} ) @@ -694,17 +694,17 @@ def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options): bgbin_lloid_map.setdefault(ce.description.split('_')[0], []).append(ce.path) for bgbin_index, dbs in sorted(bgbin_lloid_map.items(), key = lambda (k,v): int(k)): - injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) + injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) # merge all of the dbs from the same subbank - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [], - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":dbs}, output_files = {"database":injdb}, input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":injdb} ) @@ -722,15 +722,15 @@ def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options): noninjdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) # merge all of the dbs from the same subbank - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [], - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":[ce.path for ce in ce_list]}, output_files = {"database":noninjdb}, input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":noninjdb} ) @@ -742,15 +742,15 @@ def merge_in_bin(dag, toSqliteJob, lalappsRunSqliteJob, options): injdb = os.path.join(toSqliteJob.output_path, os.path.basename(ce_list[-1].path)).replace(hi_index, '%04d' % ((int(hi_index) + 1) / options.num_files_per_background_bin - 1,)) # merge all of the dbs from the same subbank - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = [], - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = [], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":[ce.path for ce in ce_list]}, output_files = {"database":injdb}, input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache') ) - sqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.injection_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + sqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.injection_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":injdb} ) @@ -783,17 +783,17 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq parents = [] # Merge the final non injection database into chunks - noninjdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + noninjdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = parents, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache": dbs}, output_files = {"database":noninjdb}, input_cache_file_name = os.path.basename(noninjdb).replace('.sqlite','.cache') ) # cluster the final non injection database - noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":noninjdb} ) chunk_nodes.append(noninjsqlitenode) @@ -807,9 +807,9 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq noninjdb = options.non_injection_db else: - noninjdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + noninjdb = dagparts.T050017_filename(instruments, 'ALL_LLOID', boundary_seg, '.sqlite') + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_files = {"": (vetoes + [options.frame_segments_file])}, input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]}, output_files = {"database":noninjdb}, @@ -817,12 +817,12 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq ) # cluster the final non injection database - noninjsqlitenode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + noninjsqlitenode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":noninjdb} ) - cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [noninjsqlitenode], + cpnode = dagparts.DAGNode(cpJob, dag, parent_nodes = [noninjsqlitenode], input_files = {"":"%s %s" % (noninjdb, noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} ) @@ -848,20 +848,20 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq parents = [] # Setup the final output names, etc. - injdb = inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) + injdb = dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in dbs], '.sqlite', path = toSqliteJob.output_path) # merge - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = parents, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = parents, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_cache_files = {"input-cache":dbs}, output_files = {"database":injdb}, input_cache_file_name = os.path.basename(injdb).replace('.sqlite','.cache') ) # cluster - clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":injdb} ) @@ -870,7 +870,7 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq # Setup the final output names, etc. - injdb = inspiral_pipe.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') + injdb = dagparts.T050017_filename(instruments, 'ALL_LLOID_%s' % sim_tag_from_inj_file(injections), boundary_seg, '.sqlite') injdbs.append(injdb) injxml = injdb.replace('.sqlite','.xml.gz') @@ -885,8 +885,8 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq xml_input = injxml # merge - sqlitenode = inspiral_pipe.generic_node(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes, - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteJob, dag, parent_nodes = chunk_nodes + ligolw_add_nodes, + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, input_files = {"": (vetoes + [options.frame_segments_file, injections])}, input_cache_files = {"input-cache": [node.input_files[""] for node in chunk_nodes]}, output_files = {"database":injdb}, @@ -894,30 +894,30 @@ def finalize_runs(dag, lalappsRunSqliteJob, toXMLJob, ligolwInspinjFindJob, toSq ) # cluster - clusternode = inspiral_pipe.generic_node(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], - opts = {"sql-file":options.cluster_sql_file, "tmp-space":inspiral_pipe.condor_scratch_space()}, + clusternode = dagparts.DAGNode(lalappsRunSqliteJob, dag, parent_nodes = [sqlitenode], + opts = {"sql-file":options.cluster_sql_file, "tmp-space":dagparts.condor_scratch_space()}, input_files = {"":injdb} ) - clusternode = inspiral_pipe.generic_node(toXMLJob, dag, parent_nodes = [clusternode], - opts = {"tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + clusternode = dagparts.DAGNode(toXMLJob, dag, parent_nodes = [clusternode], + opts = {"tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, output_files = {"extract":injxml}, input_files = {"database":injdb} ) - inspinjnode = inspiral_pipe.generic_node(ligolwInspinjFindJob, dag, parent_nodes = [clusternode], + inspinjnode = dagparts.DAGNode(ligolwInspinjFindJob, dag, parent_nodes = [clusternode], opts = {"time-window":0.9}, input_files = {"":injxml} ) - sqlitenode = inspiral_pipe.generic_node(toSqliteNoCacheJob, dag, parent_nodes = [inspinjnode], - opts = {"replace":"", "tmp-space":inspiral_pipe.condor_scratch_space(), "ilwdchar-compat":""}, + sqlitenode = dagparts.DAGNode(toSqliteNoCacheJob, dag, parent_nodes = [inspinjnode], + opts = {"replace":"", "tmp-space":dagparts.condor_scratch_space(), "ilwdchar-compat":""}, output_files = {"database":injdb}, input_files = {"":xml_input} ) - cpnode = inspiral_pipe.generic_node(cpJob, dag, parent_nodes = [sqlitenode], + cpnode = dagparts.DAGNode(cpJob, dag, parent_nodes = [sqlitenode], input_files = {"":"%s %s" % (injdb, injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL'))} ) @@ -945,8 +945,8 @@ def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralCompute margzerolagnodes = [] margnum = 16 for i,n in enumerate(range(0, len(margin), margnum)): - margout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]], '.xml.gz', path = marginalizeJob.output_path)) - margnodes.append(inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = parents, + margout.append(dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin[n:n+margnum]], '.xml.gz', path = marginalizeJob.output_path)) + margnodes.append(dagparts.DAGNode(marginalizeJob, dag, parent_nodes = parents, opts = {"marginalize":"ranking-stat-pdf"}, output_files = {"output":margout[-1]}, input_cache_files = {"likelihood-cache":margin[n:n+margnum]}, @@ -954,8 +954,8 @@ def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralCompute )) if rankpdf_zerolag_nodes: - margzerolagout.append(inspiral_pipe.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin_zerolag[n:n+margnum]], '.xml.gz', path = marginalizeWithZerolagJob.output_path)) - margzerolagnodes.append(inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = parents_zerolag, + margzerolagout.append(dagparts.group_T050017_filename_from_T050017_files([CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in margin_zerolag[n:n+margnum]], '.xml.gz', path = marginalizeWithZerolagJob.output_path)) + margzerolagnodes.append(dagparts.DAGNode(marginalizeWithZerolagJob, dag, parent_nodes = parents_zerolag, opts = {"marginalize":"ranking-stat-pdf"}, output_files = {"output":margzerolagout[-1]}, input_cache_files = {"likelihood-cache":margin_zerolag[n:n+margnum]}, @@ -971,7 +971,7 @@ def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralCompute marginalized_likelihood_with_zerolag_file = options.marginalized_likelihood_with_zerolag_file else: - margnode = inspiral_pipe.generic_node(marginalizeJob, dag, parent_nodes = margnodes, + margnode = dagparts.DAGNode(marginalizeJob, dag, parent_nodes = margnodes, opts = {"marginalize":"ranking-stat-pdf"}, output_files = {"output":"marginalized_likelihood.xml.gz"}, input_cache_files = {"likelihood-cache":margout}, @@ -980,7 +980,7 @@ def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralCompute parents = [margnode] + final_sqlite_nodes marginalized_likelihood_file = margnode.output_files["output"] - margnode = inspiral_pipe.generic_node(marginalizeWithZerolagJob, dag, parent_nodes = margzerolagnodes, + margnode = dagparts.DAGNode(marginalizeWithZerolagJob, dag, parent_nodes = margzerolagnodes, opts = {"marginalize":"ranking-stat-pdf"}, output_files = {"output":"marginalized_likelihood_with_zerolag.xml.gz"}, input_cache_files = {"likelihood-cache":margzerolagout}, @@ -990,13 +990,13 @@ def compute_FAP(marginalizeJob, marginalizeWithZerolagJob, gstlalInspiralCompute marginalized_likelihood_with_zerolag_file = margnode.output_files["output"] - farnode = inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents, - opts = {"tmp-space":inspiral_pipe.condor_scratch_space()}, + farnode = dagparts.DAGNode(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents, + opts = {"tmp-space":dagparts.condor_scratch_space()}, input_files = {"background-bins-file":marginalized_likelihood_file, "injection-db":injdbs, "non-injection-db":noninjdb} ) - inspiral_pipe.generic_node(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents_zerolag, - opts = {"tmp-space":inspiral_pipe.condor_scratch_space()}, + dagparts.DAGNode(gstlalInspiralComputeFarFromSnrChisqHistogramsJob, dag, parent_nodes = parents_zerolag, + opts = {"tmp-space":dagparts.condor_scratch_space()}, input_files = {"background-bins-file":marginalized_likelihood_with_zerolag_file, "injection-db":[injdb.replace('ALL_LLOID', 'ALL_LLOID_WZL') for injdb in injdbs], "non-injection-db":noninjdb.replace('ALL_LLOID', 'ALL_LLOID_WZL')} ) @@ -1151,7 +1151,7 @@ def parse_command_line(): #FIXME a hack to find the sql paths - share_path = os.path.split(inspiral_pipe.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal') + share_path = os.path.split(dagparts.which('gstlal_inspiral'))[0].replace('bin', 'share/gstlal') options.cluster_sql_file = os.path.join(share_path, 'simplify_and_cluster.sql') options.injection_sql_file = os.path.join(share_path, 'inj_simplify_and_cluster.sql') @@ -1201,7 +1201,7 @@ try: os.mkdir("logs") except: pass -dag = inspiral_pipe.DAG("trigger_pipe") +dag = dagparts.DAG("trigger_pipe") if options.max_inspiral_jobs is not None: dag.add_maxjobs_category("INSPIRAL", options.max_inspiral_jobs) @@ -1244,46 +1244,46 @@ if options.dist_stats_cache: elif options.lloid_cache: # analysis starting at merger step - marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + marginalizeJob = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + marginalizeWithZerolagJob = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) else: # set up jobs only needed for zerolag run - refPSDJob = inspiral_pipe.generic_job("gstlal_reference_psd", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})) - medianPSDJob = inspiral_pipe.generic_job("gstlal_median_of_psds", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - plotBanksJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_banks", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - svdJob = inspiral_pipe.generic_job("gstlal_svd_bank", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"})) - modelJob = inspiral_pipe.generic_job("gstlal_inspiral_mass_model", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - modelAddJob = inspiral_pipe.generic_job("gstlal_inspiral_add_mass_models", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - horizonJob = inspiral_pipe.generic_job("gstlal_plot_psd_horizon", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - gstlalInspiralJob = inspiral_pipe.generic_job(options.inspiral_executable, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"})) - createPriorDistStatsJob = inspiral_pipe.generic_job("gstlal_inspiral_create_prior_diststats", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - calcRankPDFsJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})) - calcRankPDFsWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_rank_pdfs", tag_base = "gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})) - calcLikelihoodJob = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - marginalizeJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - marginalizeWithZerolagJob = inspiral_pipe.generic_job("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) - -gstlalInspiralInjJob = inspiral_pipe.generic_job(options.inspiral_executable, tag_base="gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"})) -injSplitterJob = inspiral_pipe.generic_job("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -gstlalInjSnrJob = inspiral_pipe.generic_job("gstlal_inspiral_injection_snr", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})) -ligolwAddJob = inspiral_pipe.generic_job("ligolw_add", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -calcLikelihoodJobInj = inspiral_pipe.generic_job("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -gstlalInspiralComputeFarFromSnrChisqHistogramsJob = inspiral_pipe.generic_job("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -ligolwInspinjFindJob = inspiral_pipe.generic_job("lalapps_inspinjfind", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -toSqliteJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -toSqliteNoCacheJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_inj_final", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -toXMLJob = inspiral_pipe.generic_job("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -lalappsRunSqliteJob = inspiral_pipe.generic_job("lalapps_run_sqlite", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotIndividualInjectionsSummaryJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotIndividualInjectionsSummaryIsolatePrecessionJob = inspiral_pipe.generic_job("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotSensitivityJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_sensitivity", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -pageJob = inspiral_pipe.generic_job("gstlal_inspiral_summary_page", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -plotbackgroundJob = inspiral_pipe.generic_job("gstlal_inspiral_plot_background", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) -cpJob = inspiral_pipe.generic_job("cp", tag_base = "cp", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) -rmJob = inspiral_pipe.generic_job("rm", tag_base = "rm_intermediate_merger_products", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) + refPSDJob = dagparts.DAGJob("gstlal_reference_psd", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})) + medianPSDJob = dagparts.DAGJob("gstlal_median_of_psds", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + plotBanksJob = dagparts.DAGJob("gstlal_inspiral_plot_banks", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + svdJob = dagparts.DAGJob("gstlal_svd_bank", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"7GB", "want_graceful_removal":"True", "kill_sig":"15"})) + modelJob = dagparts.DAGJob("gstlal_inspiral_mass_model", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + modelAddJob = dagparts.DAGJob("gstlal_inspiral_add_mass_models", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + horizonJob = dagparts.DAGJob("gstlal_plot_psd_horizon", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + gstlalInspiralJob = dagparts.DAGJob(options.inspiral_executable, condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"})) + createPriorDistStatsJob = dagparts.DAGJob("gstlal_inspiral_create_prior_diststats", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + calcRankPDFsJob = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})) + calcRankPDFsWithZerolagJob = dagparts.DAGJob("gstlal_inspiral_calc_rank_pdfs", tag_base = "gstlal_inspiral_calc_rank_pdfs_with_zerolag", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "request_cpus":"4", "want_graceful_removal":"True", "kill_sig":"15"})) + calcLikelihoodJob = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + marginalizeJob = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + marginalizeWithZerolagJob = dagparts.DAGJob("gstlal_inspiral_marginalize_likelihood", tag_base = "gstlal_inspiral_marginalize_likelihood_with_zerolag", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) + +gstlalInspiralInjJob = dagparts.DAGJob(options.inspiral_executable, tag_base="gstlal_inspiral_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"})) +injSplitterJob = dagparts.DAGJob("gstlal_injsplitter", tag_base="gstlal_injsplitter", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +gstlalInjSnrJob = dagparts.DAGJob("gstlal_inspiral_injection_snr", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"2GB", "request_cpus":"2", "want_graceful_removal":"True", "kill_sig":"15"})) +ligolwAddJob = dagparts.DAGJob("ligolw_add", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +calcLikelihoodJobInj = dagparts.DAGJob("gstlal_inspiral_calc_likelihood", tag_base='gstlal_inspiral_calc_likelihood_inj', condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +gstlalInspiralComputeFarFromSnrChisqHistogramsJob = dagparts.DAGJob("gstlal_compute_far_from_snr_chisq_histograms", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +ligolwInspinjFindJob = dagparts.DAGJob("lalapps_inspinjfind", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +toSqliteJob = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +toSqliteNoCacheJob = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_from_xml_inj_final", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +toXMLJob = dagparts.DAGJob("ligolw_sqlite", tag_base = "ligolw_sqlite_to_xml", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +lalappsRunSqliteJob = dagparts.DAGJob("lalapps_run_sqlite", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotSummaryJob = dagparts.DAGJob("gstlal_inspiral_plotsummary", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotSummaryIsolatePrecessionJob = dagparts.DAGJob("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotIndividualInjectionsSummaryJob = dagparts.DAGJob("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotIndividualInjectionsSummaryIsolatePrecessionJob = dagparts.DAGJob("gstlal_inspiral_plotsummary", tag_base = "gstlal_inspiral_plotsummary_isolated_precession_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotSensitivityJob = dagparts.DAGJob("gstlal_inspiral_plot_sensitivity", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +pageJob = dagparts.DAGJob("gstlal_inspiral_summary_page", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +plotbackgroundJob = dagparts.DAGJob("gstlal_inspiral_plot_background", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"request_memory":"1GB", "want_graceful_removal":"True", "kill_sig":"15"})) +cpJob = dagparts.DAGJob("cp", tag_base = "cp", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) +rmJob = dagparts.DAGJob("rm", tag_base = "rm_intermediate_merger_products", condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) # # Get mchirp boundaries of banks, maximum duration of templates, and analysis segments @@ -1326,10 +1326,10 @@ elif options.reference_psd is None: # plot the horizon distance # - inspiral_pipe.generic_node(horizonJob, dag, + dagparts.DAGNode(horizonJob, dag, parent_nodes = psd_nodes.values(), input_files = {"":[node.output_files["write-psd"] for node in psd_nodes.values()]}, - output_files = {"":inspiral_pipe.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)} + output_files = {"":dagparts.T050017_filename(instruments, "HORIZON", boundary_seg, '.png', path = output_dir)} ) # @@ -1338,10 +1338,10 @@ elif options.reference_psd is None: # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache median_psd_node = \ - inspiral_pipe.generic_node(medianPSDJob, dag, + dagparts.DAGNode(medianPSDJob, dag, parent_nodes = psd_nodes.values(), input_files = {"input-cache": "reference_psd.cache"}, - output_files = {"output-name": inspiral_pipe.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))} + output_files = {"output-name": dagparts.T050017_filename(instruments, "REFERENCE_PSD", boundary_seg, '.xml.gz', path = subdir_path([medianPSDJob.output_path, str(int(boundary_seg[0]))[:5]]))} ) ref_psd = median_psd_node.output_files["output-name"] @@ -1368,7 +1368,7 @@ if not options.lloid_cache and not options.disable_calc_inj_snr: for inj in options.injections: inj_snr_nodes = [] - inj_splitter_node = inspiral_pipe.generic_node(injSplitterJob, dag, parent_nodes=[], + inj_splitter_node = dagparts.DAGNode(injSplitterJob, dag, parent_nodes=[], opts = {"output-path":injSplitterJob.output_path, "usertag": sim_tag_from_inj_file(inj.split(":")[-1]), "nsplit": num_split_inj_snr_jobs}, input_files = {"": inj.split(":")[-1]} ) @@ -1376,7 +1376,7 @@ if not options.lloid_cache and not options.disable_calc_inj_snr: # FIXME Use machinery in inspiral_pipe.py to create reference_psd.cache for i in xrange(num_split_inj_snr_jobs): - injSNRnode = inspiral_pipe.generic_node(gstlalInjSnrJob, dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node], + injSNRnode = dagparts.DAGNode(gstlalInjSnrJob, dag, parent_nodes=ref_psd_parent_nodes + [inj_splitter_node], opts = {"flow":options.flow,"fmax":options.fmax}, input_files = {"injection-file": "%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i), "reference-psd-cache": "reference_psd.cache" } @@ -1384,7 +1384,7 @@ if not options.lloid_cache and not options.disable_calc_inj_snr: injSNRnode.set_priority(98) inj_snr_nodes.append(injSNRnode) - ligolw_add_nodes.append(inspiral_pipe.generic_node(ligolwAddJob, dag, parent_nodes=inj_snr_nodes, + ligolw_add_nodes.append(dagparts.DAGNode(ligolwAddJob, dag, parent_nodes=inj_snr_nodes, input_files = {"": ' '.join(["%s/%s_INJ_SPLIT_%04d.xml" % (injSplitterJob.output_path, sim_tag_from_inj_file(inj.split(":")[-1]), i) for i in xrange(num_split_inj_snr_jobs)])}, opts = {"ilwdchar-compat":""}, output_files = {"output": inj.split(":")[-1]} @@ -1439,44 +1439,44 @@ farnode, margfiles_to_delete = compute_FAP(marginalizeJob, marginalizeWithZerola # make summary plots plotnodes = [] -plotnodes.append(inspiral_pipe.generic_node(plotSummaryJob, dag, parent_nodes=[farnode], - opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "ALL_LLOID_COMBINED", "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""}, +plotnodes.append(dagparts.DAGNode(plotSummaryJob, dag, parent_nodes=[farnode], + opts = {"segments-name": options.frame_segments_name, "tmp-space": dagparts.condor_scratch_space(), "user-tag": "ALL_LLOID_COMBINED", "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""}, input_files = {"":[noninjdb] + injdbs} )) -plotnodes.append(inspiral_pipe.generic_node(plotSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode], - opts = {"segments-name": options.frame_segments_name, "tmp-space": inspiral_pipe.condor_scratch_space(), "user-tag": "PRECESSION_LLOID_COMBINED", "plot-group":1, "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""}, +plotnodes.append(dagparts.DAGNode(plotSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode], + opts = {"segments-name": options.frame_segments_name, "tmp-space": dagparts.condor_scratch_space(), "user-tag": "PRECESSION_LLOID_COMBINED", "plot-group":1, "output-dir": output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""}, input_files = {"":[noninjdb] + injdbs} )) for injdb in injdbs: - plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryJob, dag, parent_nodes=[farnode], - opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag":injdb.replace(".sqlite","").split("-")[1], "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""}, + plotnodes.append(dagparts.DAGNode(plotIndividualInjectionsSummaryJob, dag, parent_nodes=[farnode], + opts = {"segments-name": options.frame_segments_name, "tmp-space":dagparts.condor_scratch_space(), "user-tag":injdb.replace(".sqlite","").split("-")[1], "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "remove-precession": ""}, input_files = {"":[noninjdb] + [injdb]} )) - plotnodes.append(inspiral_pipe.generic_node(plotIndividualInjectionsSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode], - opts = {"segments-name": options.frame_segments_name, "tmp-space":inspiral_pipe.condor_scratch_space(), "user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""}, + plotnodes.append(dagparts.DAGNode(plotIndividualInjectionsSummaryIsolatePrecessionJob, dag, parent_nodes=[farnode], + opts = {"segments-name": options.frame_segments_name, "tmp-space":dagparts.condor_scratch_space(), "user-tag": injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID","PRECESSION_LLOID"), "plot-group":1, "output-dir":output_dir, "likelihood-file":"post_marginalized_likelihood.xml.gz", "shrink-data-segments": 32.0, "extend-veto-segments": 8., "isolate-precession": ""}, input_files = {"":[noninjdb] + [injdb]} )) # make sensitivity plots -plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode], - opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"}, +plotnodes.append(dagparts.DAGNode(plotSensitivityJob, dag, parent_nodes=[farnode], + opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir, "tmp-space":dagparts.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"}, input_files = {"zero-lag-database":noninjdb, "":injdbs} )) for injdb in injdbs: - plotnodes.append(inspiral_pipe.generic_node(plotSensitivityJob, dag, parent_nodes=[farnode], - opts = {"user-tag":injdb.replace(".sqlite","").split("-")[1], "output-dir":output_dir, "tmp-space":inspiral_pipe.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"}, + plotnodes.append(dagparts.DAGNode(plotSensitivityJob, dag, parent_nodes=[farnode], + opts = {"user-tag":injdb.replace(".sqlite","").split("-")[1], "output-dir":output_dir, "tmp-space":dagparts.condor_scratch_space(), "veto-segments-name":"vetoes", "bin-by-source-type":"", "dist-bins":200, "data-segments-name":"datasegments"}, input_files = {"zero-lag-database":noninjdb, "":injdb} )) # make background plots -plotnodes.append(inspiral_pipe.generic_node(plotbackgroundJob, dag, parent_nodes = [farnode], opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb})) +plotnodes.append(dagparts.DAGNode(plotbackgroundJob, dag, parent_nodes = [farnode], opts = {"user-tag":"ALL_LLOID_COMBINED", "output-dir":output_dir}, input_files = {"":"post_marginalized_likelihood.xml.gz", "database":noninjdb})) # make a web page -inspiral_pipe.generic_node(pageJob, dag, parent_nodes = plotnodes, +dagparts.DAGNode(pageJob, dag, parent_nodes = plotnodes, opts = {"title":"gstlal-%d-%d-closed-box" % (int(boundary_seg[0]), int(boundary_seg[1])), "webserver-dir":options.web_dir, "glob-path":output_dir, "output-user-tag":["ALL_LLOID_COMBINED", "PRECESSION_LLOID_COMBINED"] + [injdb.replace(".sqlite","").split("-")[1] for injdb in injdbs] + [injdb.replace(".sqlite","").split("-")[1].replace("ALL_LLOID", "PRECESSION_LLOID") for injdb in injdbs]} ) @@ -1484,12 +1484,12 @@ inspiral_pipe.generic_node(pageJob, dag, parent_nodes = plotnodes, # rm intermediate merger products # for db in dbs_to_delete: - inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes, + dagparts.DAGNode(rmJob, dag, parent_nodes = plotnodes, input_files = {"": db} ) for margfile in margfiles_to_delete: - inspiral_pipe.generic_node(rmJob, dag, parent_nodes = plotnodes, + dagparts.DAGNode(rmJob, dag, parent_nodes = plotnodes, input_files = {"": margfile} ) diff --git a/gstlal-inspiral/bin/gstlal_inspiral_plot_background b/gstlal-inspiral/bin/gstlal_inspiral_plot_background index 845f888363ae7fb7c37f923c96e03a151288eb68..9703d5ede1c69d4dcb6be011375215d05fb914b4 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_plot_background +++ b/gstlal-inspiral/bin/gstlal_inspiral_plot_background @@ -61,7 +61,7 @@ from ligo.segments import utils as segmentsUtils from gstlal import far from gstlal import plotfar -from gstlal import inspiral_pipe +from gstlal import dagparts class SnrChiColourNorm(matplotlib.colors.Normalize): @@ -334,14 +334,14 @@ for bin_index, rankingstat in enumerate(sorted(rankingstats.values(), key = lamb fig = plotfar.plot_snr_chi_pdf(rankingstat, instrument, snr_chi_type, options.max_snr, sngls = sngls) if fig is None: continue - plotname = inspiral_pipe.T050017_filename(instrument, "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%04d_%s_SNRCHI2" % (options.user_tag, bin_index, snr_chi_type.upper()), seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename(instrument, "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%04d_%s_SNRCHI2" % (options.user_tag, bin_index, snr_chi_type.upper()), seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) # candidate rates fig = plotfar.plot_rates(rankingstat) - plotname = inspiral_pipe.T050017_filename("H1L1V1", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%04d_RATES" % (options.user_tag, bin_index), seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename("H1L1V1", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%04d_RATES" % (options.user_tag, bin_index), seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) @@ -358,7 +358,7 @@ if options.plot_snr_snr_pdfs: horizon_distances = rankingstat.numerator.SNRPDF.quantized_horizon_distances(horizon_distances) fig = plotfar.plot_snr_joint_pdf(rankingstat.numerator.SNRPDF, instruments, horizon_distances, rankingstat.min_instruments, options.max_snr, sngls = sngls) if fig is not None: - plotname = inspiral_pipe.T050017_filename(instruments, "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_SNR_PDF_%s" % (options.user_tag, "_".join(["%s_%s" % (k, horizon_distances[k]) for k in sorted(horizon_distances)]) ), seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename(instruments, "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_SNR_PDF_%s" % (options.user_tag, "_".join(["%s_%s" % (k, horizon_distances[k]) for k in sorted(horizon_distances)]) ), seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) @@ -368,7 +368,7 @@ if options.plot_snr_snr_pdfs: if rankingstatpdf is not None: for Title, which, NAME in (("Noise", "noise", "NOISE"), ("Signal", "signal", "SIGNAL")): fig = plotfar.plot_likelihood_ratio_pdf(rankingstatpdf, (options.min_log_lambda, options.max_log_lambda), Title, which = which) - plotname = inspiral_pipe.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%s_LIKELIHOOD_RATIO_PDF" % (options.user_tag, NAME), seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_%s_LIKELIHOOD_RATIO_PDF" % (options.user_tag, NAME), seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) @@ -382,13 +382,13 @@ if rankingstatpdf is not None and rankingstatpdf.zero_lag_lr_lnpdf.array.any(): else: xhi = options.max_log_lambda fig = plotfar.plot_likelihood_ratio_ccdf(fapfar, (options.min_log_lambda, xhi), observed_ln_likelihood_ratios = zerolag_ln_lr, is_open_box = True) - plotname = inspiral_pipe.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_NOISE_LIKELIHOOD_RATIO_CCDF_openbox" % options.user_tag, seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_NOISE_LIKELIHOOD_RATIO_CCDF_openbox" % options.user_tag, seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) fig = plotfar.plot_likelihood_ratio_ccdf(fapfar, (options.min_log_lambda, xhi), observed_ln_likelihood_ratios = timeslide_ln_lr, is_open_box = False) - plotname = inspiral_pipe.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_NOISE_LIKELIHOOD_RATIO_CCDF_closedbox" % options.user_tag, seg, options.output_format, path = options.output_dir) + plotname = dagparts.T050017_filename("COMBINED", "GSTLAL_INSPIRAL_PLOT_BACKGROUND_%s_NOISE_LIKELIHOOD_RATIO_CCDF_closedbox" % options.user_tag, seg, options.output_format, path = options.output_dir) if options.verbose: print >>sys.stderr, "writing %s" % plotname fig.savefig(plotname) diff --git a/gstlal-inspiral/bin/gstlal_inspiral_plot_banks b/gstlal-inspiral/bin/gstlal_inspiral_plot_banks index 3d1d0e4bb2f467edc4e87da6486097b57e22de23..51a4a65a17deb6d3f4b72038840b2fadbad93957 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_plot_banks +++ b/gstlal-inspiral/bin/gstlal_inspiral_plot_banks @@ -55,7 +55,7 @@ from glue.ligolw import utils as ligolw_utils from glue.lal import CacheEntry from gstlal import svd_bank -from gstlal import inspiral_pipe +from gstlal import dagparts from gstlal.plotutil import golden_ratio # @@ -348,7 +348,7 @@ lsctables.use_in(LIGOLWContentHandler) options, filenames = parse_command_line() -filename_template = inspiral_pipe.T050017_filename('H1L1V1', 'GSTLAL_INSPIRAL_PLOTBANKS_%s', (0, 0), '.png', path = options.output_dir) +filename_template = dagparts.T050017_filename('H1L1V1', 'GSTLAL_INSPIRAL_PLOTBANKS_%s', (0, 0), '.png', path = options.output_dir) # Make svd bank plots if options.plot_svd_bank: diff --git a/gstlal-inspiral/bin/gstlal_inspiral_plot_sensitivity b/gstlal-inspiral/bin/gstlal_inspiral_plot_sensitivity index 6bcf8394693a3190b9b4d07b9e82fa5e7b14bf18..bc27c1b84f1006991a00629b2260bf141153b736 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_plot_sensitivity +++ b/gstlal-inspiral/bin/gstlal_inspiral_plot_sensitivity @@ -32,7 +32,7 @@ import sys from optparse import OptionParser import itertools -from gstlal import inspiral_pipe +from gstlal import dagparts from gstlal import plotutil from glue.ligolw import ligolw @@ -711,19 +711,19 @@ for bin_type in opts.bin_types: # save and close figures ifostr = "%s_%s" % (UL.searched_instruments, "".join(sorted(instr))) - tag = inspiral_pipe.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_VOLUME_VS_FAR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) + tag = dagparts.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_VOLUME_VS_FAR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) fig_far.savefig(tag) pyplot.close(fig_far) - tag = inspiral_pipe.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_RANGE_VS_FAR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) + tag = dagparts.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_RANGE_VS_FAR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) fig_far_range.savefig(tag) pyplot.close(fig_far_range) - tag = inspiral_pipe.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_VOLUME_VS_SNR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) + tag = dagparts.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_VOLUME_VS_SNR_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) fig_snr.savefig(tag) pyplot.close(fig_snr) - tag = inspiral_pipe.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_EFFICIENCY_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) + tag = dagparts.T050017_filename(ifostr, "GSTLAL_INSPIRAL_PLOT_SENSITIVITY_%s_EFFICIENCY_BINNED_BY_%s" % (opts.user_tag, bin_type.upper()), (UL.start_time, UL.end_time), ".png", path = opts.output_dir) fig_eff.savefig(tag) pyplot.close(fig_eff) diff --git a/gstlal-inspiral/bin/gstlal_inspiral_plotsummary b/gstlal-inspiral/bin/gstlal_inspiral_plotsummary index 0e92b9f999e30e05dc7e84788db3463ee07b939e..00e82b82760a4cb13eb0b890d13eddfc94f2f933 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_plotsummary +++ b/gstlal-inspiral/bin/gstlal_inspiral_plotsummary @@ -63,7 +63,7 @@ from glue.ligolw import dbtables from glue.ligolw import lsctables from glue.ligolw.utils import segments as ligolw_segments from gstlal import far -from gstlal import inspiral_pipe +from gstlal import dagparts from gstlal import gviz_api from gstlal.plotutil import golden_ratio @@ -1796,7 +1796,7 @@ seg_class.finish() # -filename_template = inspiral_pipe.T050017_filename("H1L1V1", "GSTLAL_INSPIRAL_PLOTSUMMARY_%s_%02d_%s_%s", contents.seglists.extent_all(), "%s", path = options.output_dir) +filename_template = dagparts.T050017_filename("H1L1V1", "GSTLAL_INSPIRAL_PLOTSUMMARY_%s_%02d_%s_%s", contents.seglists.extent_all(), "%s", path = options.output_dir) while len(plots): plot_group, plot = plots.pop(0) for fig, filename_fragment, is_open_box in plot.finish(): diff --git a/gstlal-inspiral/bin/gstlal_inspiral_svd_bank_pipe b/gstlal-inspiral/bin/gstlal_inspiral_svd_bank_pipe index 62df9ac3945f8a19c5717483e102518225c6a750..6bb96dc1bbe7ea54bb9cde9cdbdd971475915829 100755 --- a/gstlal-inspiral/bin/gstlal_inspiral_svd_bank_pipe +++ b/gstlal-inspiral/bin/gstlal_inspiral_svd_bank_pipe @@ -34,6 +34,7 @@ from optparse import OptionParser # from ligo import segments +from gstlal import dagparts from gstlal import inspiral_pipe from gstlal import far from lal.utils import CacheEntry @@ -140,8 +141,8 @@ try: os.mkdir("logs") except: pass -dag = inspiral_pipe.DAG(options.output_name) -svdJob = inspiral_pipe.generic_job("gstlal_svd_bank", tag_base = "gstlal_svd_bank_%s" % ifo, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) +dag = dagparts.DAG(options.output_name) +svdJob = dagparts.DAGJob("gstlal_svd_bank", tag_base = "gstlal_svd_bank_%s" % ifo, condor_commands = dagparts.condor_command_dict_from_opts(options.condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) # Assumes cache is sorted by chirpmass or whatever the SVD sorting algorithm that was chosen files = [CacheEntry(line).path for line in open(options.bank_cache)] @@ -157,11 +158,11 @@ for i, f in enumerate(groups): clipleft = [options.overlap / 2] * len(f) # overlap must be even clipright = [options.overlap / 2] * len(f) # overlap must be even bank_ids = range(bank_ids[-1] + 1, bank_ids[-1] + 1 + len(f)) - svd_bank_name = inspiral_pipe.T050017_filename(ifo, "GSTLAL_SVD_BANK_%d" % i, (0, 0), ".xml.gz", path = svdJob.output_path) + svd_bank_name = dagparts.T050017_filename(ifo, "GSTLAL_SVD_BANK_%d" % i, (0, 0), ".xml.gz", path = svdJob.output_path) svd_bank_name = os.path.join(os.getcwd(), svd_bank_name) dag.output_cache.append(CacheEntry(ifo, "GSTLAL_SVD_BANK_%d" % i, segments.segment(0, 0), "file://localhost%s" % (svd_bank_name,))) - svdNode = inspiral_pipe.generic_node(svdJob, dag, [], + svdNode = dagparts.DAGNode(svdJob, dag, [], opts = {"flow":options.flow, "svd-tolerance":options.tolerance, "ortho-gate-fap":0.5, diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_daily_page b/gstlal-inspiral/bin/gstlal_ll_inspiral_daily_page index ff69fa8316e5a292e24b0d9f9279b2f412dbce19..c0742c334963f5b446b4f27ab26857a792a71385 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_daily_page +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_daily_page @@ -28,7 +28,7 @@ from gstlal import aggregator import lal from lal import LIGOTimeGPS from multiprocessing import Pool -from gstlal import inspiral_pipe +from gstlal import dagparts from copy import copy def now(): @@ -119,7 +119,7 @@ if __name__ == '__main__': noninjdball = os.path.join(os.path.join(options.directory, d), 'H1L1-ALL_LLOID-%s00000-100000.sqlite' % (d,)) for injection_file in inj_file_bins: - injdball[injection_file] = os.path.join(os.path.join(options.directory, d), inspiral_pipe.T050017_filename(instruments, "ALL_LLOID_%s" % injtag(injection_file), (int(d) * 100000, (int(d) + 1) * 100000), '.sqlite')) + injdball[injection_file] = os.path.join(os.path.join(options.directory, d), dagparts.T050017_filename(instruments, "ALL_LLOID_%s" % injtag(injection_file), (int(d) * 100000, (int(d) + 1) * 100000), '.sqlite')) if float(now()) - float("%s00000" % d) > 125000 and all([os.path.exists(f) for f in injdball.values()]+[os.path.exists(noninjdball)]): print >> sys.stderr, "directory is %s %s greater than 125000 seconds old and has already been processed...continuing" % (n,d) diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe index 0a376546e5e1fe74723691ed1352b6e20756e3d6..ec7a0213bb3d6705fe9c8f6ed1b74e06d4533411 100755 --- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe +++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe @@ -66,14 +66,14 @@ from lal.utils import CacheEntry # } # @enddot -class lvalert_listen_job(inspiral_pipe.generic_job): +class lvalert_listen_job(dagparts.DAGJob): """ A lvalert_listen_job """ def __init__(self, program, gracedb_service_url = "https://gracedb.ligo.org/api/", gracedb_group = "CBC", gracedb_search = "LowMass", gracedb_pipeline = "gstlal", progs = ("gstlal_inspiral_lvalert_psd_plotter", "gstlal_inspiral_followups_from_gracedb"), inj_progs = ("gstlal_inspiral_lvalert_psd_plotter", "gstlal_inspiral_followups_from_gracedb"), condor_commands = {}, inj_gracedb_group = "CBC", inj_gracedb_search = "LowMass", inj_gracedb_pipeline = "gstlal", inj_gracedb_service_url = "https://simdb.cgca.uwm.edu/api/", injections = False): """ """ - inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands) # produce the lvalert processor @@ -116,14 +116,14 @@ class lvalert_listen_node(pipeline.CondorDAGNode): dag.add_node(self) -class zookeeper_job(inspiral_pipe.generic_job): +class zookeeper_job(dagparts.DAGJob): """ A zookeeper job """ def __init__(self, program = "zookeeper-server-start.sh", datadir = os.path.join(os.getcwd(), "zookeeper"), port = 2181, maxclients = 0, condor_commands = {}): """ """ - inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(datadir) @@ -142,14 +142,14 @@ maxClientCnxns=%d f.close() -class kafka_job(inspiral_pipe.generic_job): +class kafka_job(dagparts.DAGJob): """ A kafka job """ def __init__(self, program = "kafka-server-start.sh", logdir = os.path.join(os.getcwd(), "kafka"), host = "10.14.0.112:9092", zookeeperaddr = "localhost:2181", condor_commands = {}): """ """ - inspiral_pipe.generic_job.__init__(self, program, universe = "local", condor_commands = condor_commands) + dagparts.DAGJob.__init__(self, program, universe = "local", condor_commands = condor_commands) try: os.mkdir(logdir) @@ -314,44 +314,44 @@ try: os.mkdir("logs") except: pass try: os.mkdir("gracedb") except: pass -dag = dagparts.CondorDAG("trigger_pipe") +dag = dagparts.DAG("trigger_pipe") # # setup the job classes # -gstlalInspiralJob = inspiral_pipe.generic_job('gstlal_inspiral', condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) +gstlalInspiralJob = dagparts.DAGJob('gstlal_inspiral', condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) if inj_channel_dict: - gstlalInspiralInjJob = inspiral_pipe.generic_job('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) + gstlalInspiralInjJob = dagparts.DAGJob('gstlal_inspiral', tag_base = "gstlal_inspiral_inj", condor_commands = dagparts.condor_command_dict_from_opts(options.inspiral_condor_command, {"want_graceful_removal":"True", "kill_sig":"15"})) # A local universe job that will run in a loop marginalizing all of the likelihoods -margJob = inspiral_pipe.generic_job('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) +margJob = dagparts.DAGJob('gstlal_inspiral_marginalize_likelihoods_online', universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)) # an lvalert_listen job -listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False) +listenJob = lvalert_listen_job("lvalert_listen", gracedb_service_url = options.gracedb_service_url, gracedb_group = options.gracedb_group, gracedb_search = options.gracedb_search, gracedb_pipeline = options.gracedb_pipeline, progs = options.lvalert_listener_program, inj_progs = options.inj_lvalert_listener_program, condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), inj_gracedb_service_url = options.inj_gracedb_service_url, inj_gracedb_group = options.inj_gracedb_group, inj_gracedb_search = options.inj_gracedb_search, inj_gracedb_pipeline = options.inj_gracedb_pipeline, injections = True if inj_channel_dict else False) # Zookeeper and Kafka Jobs and Nodes which only get set if you specify the kafka server if options.output_kafka_server is not None: - zooJob = zookeeper_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port) - kafkaJob = kafka_job(condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port) - zooNode = inspiral_pipe.generic_node(zooJob, dag, [], opts = {"":"zookeeper.properties"}) - kafkaNode = inspiral_pipe.generic_node(kafkaJob, dag, [], opts = {"":"kafka.properties"}) + zooJob = zookeeper_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), port = options.zookeeper_port) + kafkaJob = kafka_job(condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command), host = options.output_kafka_server, zookeeperaddr = "localhost:%d" % options.zookeeper_port) + zooNode = dagparts.DAGNode(zooJob, dag, [], opts = {"":"zookeeper.properties"}) + kafkaNode = dagparts.DAGNode(kafkaJob, dag, [], opts = {"":"kafka.properties"}) # aggregator job -aggJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_aggregator", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command)) +aggJob = dagparts.DAGJob("gstlal_ll_inspiral_aggregator", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) # state job -analysisStateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_state", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command)) +analysisStateJob = dagparts.DAGJob("gstlal_ll_inspiral_state", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) # Summary page job -pageJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) +pageJob = dagparts.DAGJob("gstlal_ll_inspiral_daily_page_online", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)) # DQ job -dqJob = inspiral_pipe.generic_job("gstlal_ll_dq", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.non_inspiral_condor_command)) +dqJob = dagparts.DAGJob("gstlal_ll_dq", condor_commands = dagparts.condor_command_dict_from_opts(options.non_inspiral_condor_command)) if options.state_backup_destination: # State saving job - stateJob = inspiral_pipe.generic_job("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.local_condor_command)) + stateJob = dagparts.DAGJob("gstlal_ll_inspiral_save_state", universe = "local", condor_commands = dagparts.condor_command_dict_from_opts(options.local_condor_command)) # # Setup the Node classes @@ -390,7 +390,7 @@ for ifo in channel_dict: "out-path": outpath } common_opts.update(datasource_opts) - inspiral_pipe.generic_node(dqJob, dag, [], opts = common_opts) + dagparts.DAGNode(dqJob, dag, [], opts = common_opts) # # loop over banks to run gstlal inspiral pre clustering and far computation @@ -445,7 +445,7 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr "dq-vector-on-bits":options.dq_vector_on_bits, "dq-vector-off-bits":options.dq_vector_off_bits, "svd-bank":svd_bank_string, - "tmp-space":inspiral_pipe.condor_scratch_space(), + "tmp-space":dagparts.condor_scratch_space(), "track-psd":"", "control-peak-time":options.control_peak_time, "coincidence-threshold":options.coincidence_threshold, @@ -464,7 +464,7 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr } common_opts.update(datasource_opts) - inspNode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, [], + inspNode = dagparts.DAGNode(gstlalInspiralJob, dag, [], opts = common_opts, input_files = { "ranking-stat-input":[likefile], @@ -507,7 +507,7 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr "dq-vector-on-bits":options.inj_dq_vector_on_bits, "dq-vector-off-bits":options.inj_dq_vector_off_bits, "svd-bank":svd_bank_string, - "tmp-space":inspiral_pipe.condor_scratch_space(), + "tmp-space":dagparts.condor_scratch_space(), "track-psd":"", "control-peak-time":options.control_peak_time, "coincidence-threshold":options.coincidence_threshold, @@ -524,7 +524,7 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr "time-slide-file":options.time_slide_file } common_opts.update(datasource_opts) - inspInjNode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, [], + inspInjNode = dagparts.DAGNode(gstlalInspiralInjJob, dag, [], opts = common_opts, input_files = { "ranking-stat-input":[likefile], @@ -539,7 +539,7 @@ def groups(l, n): for i in xrange(0, len(l), n): yield l[i:i+n] -margNode = inspiral_pipe.generic_node(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {}) +margNode = dagparts.DAGNode(margJob, dag, [], opts = {}, input_files = {"":[options.marginalized_likelihood_file] + ["%s_registry.txt" % r for r in jobTags]}, output_files = {}) # # FIXME by default the inspiral jobs advertise the current directory as their @@ -547,27 +547,27 @@ margNode = inspiral_pipe.generic_node(margJob, dag, [], opts = {}, input_files = # snr_routes = ["%s_snr_history" % ifo for ifo in channel_dict] -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes, "kafka-server": options.output_kafka_server}) -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"], "kafka-server": options.output_kafka_server}) -aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"], "kafka-server": options.output_kafka_server}) +aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": snr_routes, "kafka-server": options.output_kafka_server}) +aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"], "kafka-server": options.output_kafka_server}) +aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"], "kafka-server": options.output_kafka_server}) -analysisStateNode = inspiral_pipe.generic_node(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys(), "kafka-server": options.output_kafka_server}) +analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys(), "kafka-server": options.output_kafka_server}) # summary page if options.injection_file: - pageNode = inspiral_pipe.generic_node(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {}) + pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "injection-file": options.injection_file, "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {}) for injfile, jobrange in inj_range_dict.items(): - aggNode = inspiral_pipe.generic_node(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]}) + aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"%s_aggregator" % injfile.split(".")[0], "job-tag": os.getcwd(), "job-start": int(min(jobrange))+1000, "num-jobs": len(jobrange), "route": ["far_history", "likelihood_history", "snr_history"], "data-type":["max", "min"]}) else: - pageNode = inspiral_pipe.generic_node(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {}) + pageNode = dagparts.DAGNode(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {}) if options.state_backup_destination: - stateNode = inspiral_pipe.generic_node(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {}) + stateNode = dagparts.DAGNode(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {}) # diff --git a/gstlal-inspiral/python/inspiral_pipe.py b/gstlal-inspiral/python/inspiral_pipe.py index 04420e35d7c840c9338ae1acb29071aacf8743ad..42d96b73340897a1eec6a42addb91d6e412ab592 100644 --- a/gstlal-inspiral/python/inspiral_pipe.py +++ b/gstlal-inspiral/python/inspiral_pipe.py @@ -36,10 +36,7 @@ # - In inspiral_pipe.py Fix the InsiralJob.___init___: fix the arguments # - On line 201, fix the comment or explain what the comment is meant to be -import math -import sys, os -import subprocess, socket, tempfile, copy, doctest -from glue import pipeline +import socket, copy, doctest from ligo import segments from glue.ligolw import lsctables, ligolw from glue.ligolw import utils as ligolw_utils @@ -52,62 +49,6 @@ from lal.utils import CacheEntry # -def which(prog): - """! - Like the which program to find the path to an executable - - >>> which("ls") - '/bin/ls' - - """ - which = subprocess.Popen(['which',prog], stdout=subprocess.PIPE) - out = which.stdout.read().strip() - if not out: - print >>sys.stderr, "ERROR: could not find %s in your path, have you built the proper software and source the proper env. scripts?" % (prog,prog) - raise ValueError - return out - - -def condor_scratch_space(): - """! - A way to standardize the condor scratch space even if it changes - >>> condor_scratch_space() - '_CONDOR_SCRATCH_DIR' - """ - return "_CONDOR_SCRATCH_DIR" - - -def log_path(): - """! - The stupid pet tricks to find log space on the LDG. - Defaults to checking TMPDIR first. - """ - host = socket.getfqdn() - try: - return os.environ['TMPDIR'] - except KeyError: - print "\n\n!!!! $TMPDIR NOT SET !!!!\n\n\tPLEASE email your admin to tell them to set $TMPDIR to be the place where a users temporary files should be\n" - #FIXME add more hosts as you need them - if 'cit' in host or 'caltech.edu' in host: - tmp = '/usr1/' + os.environ['USER'] - print "falling back to ", tmp - return tmp - if 'phys.uwm.edu' in host: - tmp = '/localscratch/' + os.environ['USER'] - print "falling back to ", tmp - return tmp - if 'aei.uni-hannover.de' in host: - tmp = '/local/user/' + os.environ['USER'] - print "falling back to ", tmp - return tmp - if 'phy.syr.edu' in host: - tmp = '/usr1/' + os.environ['USER'] - print "falling back to ", tmp - return tmp - - raise KeyError("$TMPDIR is not set and I don't recognize this environment") - - def webserver_url(): """! The stupid pet tricks to find webserver on the LDG. @@ -127,177 +68,6 @@ def webserver_url(): raise NotImplementedError("I don't know where the webserver is for this environment") -# -# DAG class -# - - -class DAG(pipeline.CondorDAG): - """! - A thin subclass of pipeline.CondorDAG. - - Extra features include an add_node() method and a cache writing method. - Also includes some standard setup, e.g., log file paths etc. - """ - def __init__(self, name, logpath = log_path()): - self.basename = name.replace(".dag","") - tempfile.tempdir = logpath - tempfile.template = self.basename + '.dag.log.' - logfile = tempfile.mktemp() - fh = open( logfile, "w" ) - fh.close() - pipeline.CondorDAG.__init__(self,logfile) - self.set_dag_file(self.basename) - self.jobsDict = {} - self.output_cache = [] - - def add_node(self, node): - node.set_retry(3) - node.add_macro("macronodename", node.get_name()) - pipeline.CondorDAG.add_node(self, node) - - def write_cache(self): - out = self.basename + ".cache" - f = open(out,"w") - for c in self.output_cache: - f.write(str(c)+"\n") - f.close() - - -class InspiralJob(pipeline.CondorDAGJob): - """! - A job class that subclasses pipeline.CondorDAGJob and adds some extra - boiler plate items for gstlal inspiral jobs - """ - def __init__(self, executable, tag_base, universe = "vanilla"): - self.__prog__ = tag_base - self.__executable = executable - self.__universe = universe - pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable) - self.add_condor_cmd('getenv','True') - self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;") - self.tag_base = tag_base - self.set_sub_file(tag_base+'.sub') - self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out') - self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err') - self.number = 1 - # make an output directory for files - self.output_path = tag_base - try: - os.mkdir(self.output_path) - except: - pass - - -class InspiralNode(pipeline.CondorDAGNode): - """! - A node class that subclasses pipeline.CondorDAGNode that automates - adding the node to the dag, makes sensible names and allows a list of parent - nodes to be provided. - """ - def __init__(self, job, dag, p_node=[]): - pipeline.CondorDAGNode.__init__(self, job) - for p in p_node: - self.add_parent(p) - self.set_name("%s_%04X" % (job.tag_base, job.number)) - job.number += 1 - dag.add_node(self) - - -class generic_job(InspiralJob): - """! - A generic job class which tends to do the "right" thing when given just - an executable name but otherwise is a subclass of InspiralJob and thus - pipeline.CondorDAGJob - """ - def __init__(self, program, tag_base = None, condor_commands = {}, **kwargs): - executable = which(program) - InspiralJob.__init__(self, executable, tag_base or os.path.split(executable)[1], **kwargs) - for cmd,val in condor_commands.items(): - self.add_condor_cmd(cmd, val) - - -class generic_node(InspiralNode): - """! - A generic node class which tends to do the "right" thing when given a - job, a dag, parent nodes, a dictionary options relevant to the job, a - dictionary of options related to input files and a dictionary of options - related to output files. Otherwise it is a subclass of InspiralNode and thus - pipeline.CondorDAGNode - - NOTE and important and subtle behavior - You can specify an option with - an empty argument by setting it to "". However options set to None are simply - ignored. - """ - def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None): - InspiralNode.__init__(self, job, dag, parent_nodes) - - self.input_files = input_files.copy() - self.input_files.update(input_cache_files) - self.output_files = output_files.copy() - self.output_files.update(output_cache_files) - - self.cache_inputs = {} - self.cache_outputs = {} - - for opt, val in opts.items() + output_files.items() + input_files.items(): - if val is None: - continue # not the same as val = '' which is allowed - if not hasattr(val, "__iter__"): # catches list like things but not strings - if opt == "": - self.add_var_arg(val) - else: - self.add_var_opt(opt, val) - # Must be an iterable - else: - if opt == "": - [self.add_var_arg(a) for a in val] - else: - self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val)) - - # Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string - # is handled by gstlal_inspiral_pipe directly - - cache_dir = os.path.join(job.tag_base, 'cache') - - for opt, val in input_cache_files.items(): - if not os.path.isdir(cache_dir): - os.mkdir(cache_dir) - cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val] - if input_cache_file_name is None: - cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir) - else: - cache_file_name = os.path.join(cache_dir, input_cache_file_name) - open(cache_file_name, "w").write("\n".join(map(str, cache_entries))) - self.add_var_opt(opt, cache_file_name) - # Keep track of the cache files being created - self.cache_inputs.setdefault(opt, []).append(cache_file_name) - - for opt, val in output_cache_files.items(): - if not os.path.isdir(cache_dir): - os.mkdir(cache_dir) - cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val] - cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir) - open(cache_file_name, "w").write("\n".join(map(str, cache_entries))) - self.add_var_opt(opt, cache_file_name) - # Keep track of the cache files being created - self.cache_outputs.setdefault(opt, []).append(cache_file_name) - -def pipeline_dot_py_append_opts_hack(opt, vals): - """! - A way to work around the dictionary nature of pipeline.py which can - only record options once. - - >>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3]) - '1 --my-favorite-option 2 --my-favorite-option 3' - """ - out = str(vals[0]) - for v in vals[1:]: - out += " --%s %s" % (opt, str(v)) - return out - - - # # Utility functions # @@ -366,95 +136,6 @@ def build_bank_groups(cachedict, numbanks = [2], maxjobs = None): return outstrs -def T050017_filename(instruments, description, seg, extension, path = None): - """! - A function to generate a T050017 filename. - """ - if not isinstance(instruments, basestring): - instruments = "".join(sorted(instruments)) - start, end = seg - start = int(math.floor(start)) - try: - duration = int(math.ceil(end)) - start - # FIXME this is not a good way of handling this... - except OverflowError: - duration = 2000000000 - extension = extension.strip('.') - if path is not None: - return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension) - else: - return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension) - - -if __name__ == "__main__": - import doctest - doctest.testmod() - - -def condor_command_dict_from_opts(opts, defaultdict = None): - """! - A function to turn a list of options into a dictionary of condor commands, e.g., - - >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"]) - {'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} - >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"}) - {'somecommand': 'somevalue', 'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} - >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"}) - {'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} - """ - - if defaultdict is None: - defaultdict = {} - for o in opts: - osplit = o.split("=") - k = osplit[0] - v = "=".join(osplit[1:]) - defaultdict.update([(k, v)]) - return defaultdict - - -def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None): - """! - A function to return the name of a file created from multiple files following - the T050017 convention. In addition to the T050017 requirements, this assumes - that numbers relevant to organization schemes will be the first entry in the - description, e.g. 0_DIST_STATS, and that all files in a given cache file are - from the same group of ifos and either contain data from the same segment or - from the same background bin. Note, that each file doesn't have to be from - the same IFO, for example the template bank cache could contain template bank - files from H1 and template bank files from L1. - """ - # Check that every file has same observatory. - observatories = ''.join(sorted(list(set([cache_entry.observatory for cache_entry in cache_entries])))) - split_description = cache_entries[0].description.split('_') - min_bin = [x for x in split_description[:2] if x.isdigit()] - max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()] - seg = segments.segmentlist(cache_entry.segment for cache_entry in cache_entries).extent() - if min_bin: - min_bin = min_bin[0] - if max_bin: - max_bin = max_bin[-1] - if min_bin and (min_bin == max_bin or not max_bin): - # All files from same bin, thus segments may be different. - # Note that this assumes that if the last file in the cache - # does not start with a number that every file in the cache is - # from the same bin, an example of this is the cache file - # generated for gstlal_inspiral_calc_likelihood, which contains - # all of the DIST_STATS files from a given background bin and - # then CREATE_PRIOR_DIST_STATS files which are not generated - # for specific bins - return T050017_filename(observatories, cache_entries[0].description, seg, extension, path = path) - elif min_bin and max_bin and min_bin != max_bin: - if split_description[1].isdigit(): - description_base = split_description[2:] - else: - description_base = split_description[1:] - # Files from different bins, thus segments must be same - return T050017_filename(observatories, '_'.join([min_bin, max_bin] + description_base), seg, extension, path = path) - else: - print >>sys.stderr, "ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path) - raise ValueError - def get_svd_bank_params_online(svd_bank_cache): template_mchirp_dict = {} for ce in [CacheEntry(f) for f in open(svd_bank_cache)]: @@ -469,6 +150,7 @@ def get_svd_bank_params_online(svd_bank_cache): xmldoc.unlink() return template_mchirp_dict + def get_svd_bank_params(svd_bank_cache, online = False): if not online: bgbin_file_map = {} @@ -492,3 +174,8 @@ def get_svd_bank_params(svd_bank_cache, online = False): return template_mchirp_dict, bgbin_file_map, max_time else: return template_mchirp_dict + + +if __name__ == "__main__": + import doctest + doctest.testmod() diff --git a/gstlal-ugly/bin/gstlal_inspiral_treebank_dag b/gstlal-ugly/bin/gstlal_inspiral_treebank_dag index eae4f7450952a72f86d73ccdc639279d53a1ab8c..9c95add2f8ab18e13ad139e72da6a432c504df06 100755 --- a/gstlal-ugly/bin/gstlal_inspiral_treebank_dag +++ b/gstlal-ugly/bin/gstlal_inspiral_treebank_dag @@ -9,8 +9,8 @@ from glue.ligolw import ilwd from glue.ligolw.utils import process as ligolw_process from gstlal import metric as metric_module import os,sys,argparse -from gstlal import inspiral, inspiral_pipe -from gstlal import dagparts as gstlaldagparts +from gstlal import inspiral +from gstlal import dagparts # Read command line options def parse_command_line(): @@ -79,8 +79,8 @@ def parse_command_line(): return args args = parse_command_line() -dag = inspiral_pipe.DAG("treebank") -treeJob = inspiral_pipe.generic_job("gstlal_inspiral_treebank") +dag = dagparts.DAG("treebank") +treeJob = dagparts.DAGJob("gstlal_inspiral_treebank") argsdict = dict((key.replace("_","-"), value) for key,value in vars(args).items()) mass2 = numpy.logspace(math.log(args.min_mass2, 8/3.), math.log(args.max_mass2, 8/3.), args.num_jobs, base=8/3.) del argsdict["num-jobs"] @@ -91,7 +91,7 @@ for minmass2,maxmass2 in zip(mass2[:-1],mass2[1:]): argsdict["max-mass2"] = maxmass2 argsdict["user-tag"] = cnt cnt+=1 - pnodes.append(inspiral_pipe.generic_node(treeJob, dag, parent_nodes = [], input_files = {}, output_files = {}, opts = argsdict)) + pnodes.append(dagparts.DAGNode(treeJob, dag, parent_nodes = [], input_files = {}, output_files = {}, opts = argsdict)) #FIXME add a ligolw_add job to the end of the dag try: diff --git a/gstlal/bin/gstlal_fake_frames_pipe b/gstlal/bin/gstlal_fake_frames_pipe index f4dce97da4a590997ef4093f73e3a1c1b0dc94b6..cd3237641d0b3a3c366172d94cce5f8195e392bd 100755 --- a/gstlal/bin/gstlal_fake_frames_pipe +++ b/gstlal/bin/gstlal_fake_frames_pipe @@ -294,7 +294,7 @@ try: except: pass -dag = dagparts.CondorDAG("gstlal_fake_frames_pipe") +dag = dagparts.DAG("gstlal_fake_frames_pipe") seglists = ligolw_segments.segmenttable_get_by_name(ligolw_utils.load_filename(options.frame_segments_file, verbose = options.verbose, contenthandler = ligolw_segments.LIGOLWContentHandler), options.frame_segments_name).coalesce() choosesegs(seglists, options.min_segment_length) diff --git a/gstlal/python/dagparts.py b/gstlal/python/dagparts.py index 4a2ef8a221ce9c33dad1f54d2008d1c7d9ae38a5..f2fa361e85c53b83b49ae6f361ee5a4117fdd874 100644 --- a/gstlal/python/dagparts.py +++ b/gstlal/python/dagparts.py @@ -33,16 +33,18 @@ DAG construction tools. """ +import doctest +import math import os import sys import socket import subprocess import tempfile -import math from ligo import segments from glue import pipeline +from lal.utils import CacheEntry __author__ = "Kipp Cannon <kipp.cannon@ligo.org>, Chad Hanna <chad.hanna@ligo.org>" __date__ = "$Date$" #FIXME @@ -67,7 +69,20 @@ def which(prog): return out +def condor_scratch_space(): + """! + A way to standardize the condor scratch space even if it changes + >>> condor_scratch_space() + '_CONDOR_SCRATCH_DIR' + """ + return "_CONDOR_SCRATCH_DIR" + + def log_path(): + """! + The stupid pet tricks to find log space on the LDG. + Defaults to checking TMPDIR first. + """ host = socket.getfqdn() try: return os.environ['TMPDIR'] @@ -103,22 +118,27 @@ def log_path(): # -class CondorDAG(pipeline.CondorDAG): +class DAG(pipeline.CondorDAG): + """! + A thin subclass of pipeline.CondorDAG. + Extra features include an add_node() method and a cache writing method. + Also includes some standard setup, e.g., log file paths etc. + """ def __init__(self, name, logpath = log_path()): - self.basename = name - fh, logfile = tempfile.mkstemp(dir = log_path(), prefix = self.basename + '.dag.log.') - os.close(fh) + self.basename = name.replace(".dag","") + tempfile.tempdir = logpath + tempfile.template = self.basename + '.dag.log.' + logfile = tempfile.mktemp() + fh = open( logfile, "w" ) + fh.close() pipeline.CondorDAG.__init__(self,logfile) self.set_dag_file(self.basename) self.jobsDict = {} - self.node_id = 0 self.output_cache = [] - def add_node(self, node, retry = 0): + def add_node(self, node, retry = 3): node.set_retry(retry) - self.node_id += 1 - node.add_macro("macroid", self.node_id) node.add_macro("macronodename", node.get_name()) pipeline.CondorDAG.add_node(self, node) @@ -130,33 +150,146 @@ class CondorDAG(pipeline.CondorDAG): f.close() -class CondorDAGJob(pipeline.CondorDAGJob): - """ - A generic job class for gstlal stuff +class DAGJob(pipeline.CondorDAGJob): + """! + A job class that subclasses pipeline.CondorDAGJob and adds some extra + boiler plate items for gstlal jobs which tends to do the "right" thing + when given just an executable name. """ - def __init__(self, executable, tag_base): - self.__prog__ = tag_base - self.__executable = executable - self.__universe = 'vanilla' + def __init__(self, executable, tag_base = None, universe = "vanilla", condor_commands = {}): + self.__executable = which(executable) + self.__universe = universe + if tag_base: + self.tag_base = tag_base + else: + self.tag_base = os.path.split(self.__executable)[1] + self.__prog__ = self.tag_base pipeline.CondorDAGJob.__init__(self, self.__universe, self.__executable) self.add_condor_cmd('getenv','True') - self.tag_base = tag_base - self.set_sub_file(tag_base+'.sub') - self.set_stdout_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).out') - self.set_stderr_file('logs/'+tag_base+'-$(macroid)-$(macronodename)-$(cluster)-$(process).err') + self.add_condor_cmd('environment',"GST_REGISTRY_UPDATE=no;") + self.set_sub_file(self.tag_base+'.sub') + self.set_stdout_file('logs/$(macronodename)-$(cluster)-$(process).out') + self.set_stderr_file('logs/$(macronodename)-$(cluster)-$(process).err') self.number = 1 - - -class CondorDAGNode(pipeline.CondorDAGNode): - """ - A generic node class for gstlal stuff + # make an output directory for files + self.output_path = self.tag_base + try: + os.mkdir(self.output_path) + except: + pass + for cmd,val in condor_commands.items(): + self.add_condor_cmd(cmd, val) + + +class DAGNode(pipeline.CondorDAGNode): + """! + A node class that subclasses pipeline.CondorDAGNode that automates + adding the node to the dag, makes sensible names and allows a list of parent + nodes to be provided. + + It tends to do the "right" thing when given a job, a dag, parent nodes, dictionary + options relevant to the job, a dictionary of options related to input files and a + dictionary of options related to output files. + + NOTE and important and subtle behavior - You can specify an option with + an empty argument by setting it to "". However options set to None are simply + ignored. """ - def __init__(self, job, dag, p_node=[]): + def __init__(self, job, dag, parent_nodes, opts = {}, input_files = {}, output_files = {}, input_cache_files = {}, output_cache_files = {}, input_cache_file_name = None): pipeline.CondorDAGNode.__init__(self, job) - for p in p_node: + for p in parent_nodes: self.add_parent(p) + self.set_name("%s_%04X" % (job.tag_base, job.number)) + job.number += 1 dag.add_node(self) + self.input_files = input_files.copy() + self.input_files.update(input_cache_files) + self.output_files = output_files.copy() + self.output_files.update(output_cache_files) + + self.cache_inputs = {} + self.cache_outputs = {} + + for opt, val in opts.items() + output_files.items() + input_files.items(): + if val is None: + continue # not the same as val = '' which is allowed + if not hasattr(val, "__iter__"): # catches list like things but not strings + if opt == "": + self.add_var_arg(val) + else: + self.add_var_opt(opt, val) + # Must be an iterable + else: + if opt == "": + [self.add_var_arg(a) for a in val] + else: + self.add_var_opt(opt, pipeline_dot_py_append_opts_hack(opt, val)) + + # Create cache files for long command line arguments and store them in the job's subdirectory. NOTE the svd-bank string + # is handled by gstlal_inspiral_pipe directly + + cache_dir = os.path.join(job.tag_base, 'cache') + + for opt, val in input_cache_files.items(): + if not os.path.isdir(cache_dir): + os.mkdir(cache_dir) + cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val] + if input_cache_file_name is None: + cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir) + else: + cache_file_name = os.path.join(cache_dir, input_cache_file_name) + open(cache_file_name, "w").write("\n".join(map(str, cache_entries))) + self.add_var_opt(opt, cache_file_name) + # Keep track of the cache files being created + self.cache_inputs.setdefault(opt, []).append(cache_file_name) + + for opt, val in output_cache_files.items(): + if not os.path.isdir(cache_dir): + os.mkdir(cache_dir) + cache_entries = [CacheEntry.from_T050017("file://localhost%s" % os.path.abspath(filename)) for filename in val] + cache_file_name = group_T050017_filename_from_T050017_files(cache_entries, '.cache', path = cache_dir) + open(cache_file_name, "w").write("\n".join(map(str, cache_entries))) + self.add_var_opt(opt, cache_file_name) + # Keep track of the cache files being created + self.cache_outputs.setdefault(opt, []).append(cache_file_name) + + +def condor_command_dict_from_opts(opts, defaultdict = None): + """! + A function to turn a list of options into a dictionary of condor commands, e.g., + + >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"]) + {'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} + >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"somecommand":"somevalue"}) + {'somecommand': 'somevalue', 'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} + >>> condor_command_dict_from_opts(["+Online_CBC_SVD=True", "TARGET.Online_CBC_SVD =?= True"], {"+Online_CBC_SVD":"False"}) + {'TARGET.Online_CBC_SVD ': '?= True', '+Online_CBC_SVD': 'True'} + """ + + if defaultdict is None: + defaultdict = {} + for o in opts: + osplit = o.split("=") + k = osplit[0] + v = "=".join(osplit[1:]) + defaultdict.update([(k, v)]) + return defaultdict + + +def pipeline_dot_py_append_opts_hack(opt, vals): + """! + A way to work around the dictionary nature of pipeline.py which can + only record options once. + + >>> pipeline_dot_py_append_opts_hack("my-favorite-option", [1,2,3]) + '1 --my-favorite-option 2 --my-favorite-option 3' + """ + out = str(vals[0]) + for v in vals[1:]: + out += " --%s %s" % (opt, str(v)) + return out + # # ============================================================================= @@ -207,3 +340,80 @@ def breakupseglists(seglists, maxextent, overlap): for bigseg in seglist: newseglist.extend(breakupseg(bigseg, maxextent, overlap)) seglists[instrument] = newseglist + + +# +# ============================================================================= +# +# File utilities +# +# ============================================================================= +# + + +def T050017_filename(instruments, description, seg, extension, path = None): + """! + A function to generate a T050017 filename. + """ + if not isinstance(instruments, basestring): + instruments = "".join(sorted(instruments)) + start, end = seg + start = int(math.floor(start)) + try: + duration = int(math.ceil(end)) - start + # FIXME this is not a good way of handling this... + except OverflowError: + duration = 2000000000 + extension = extension.strip('.') + if path is not None: + return '%s/%s-%s-%d-%d.%s' % (path, instruments, description, start, duration, extension) + else: + return '%s-%s-%d-%d.%s' % (instruments, description, start, duration, extension) + + +def group_T050017_filename_from_T050017_files(cache_entries, extension, path = None): + """! + A function to return the name of a file created from multiple files following + the T050017 convention. In addition to the T050017 requirements, this assumes + that numbers relevant to organization schemes will be the first entry in the + description, e.g. 0_DIST_STATS, and that all files in a given cache file are + from the same group of ifos and either contain data from the same segment or + from the same background bin. Note, that each file doesn't have to be from + the same IFO, for example the template bank cache could contain template bank + files from H1 and template bank files from L1. + """ + # Check that every file has same observatory. + observatories = ''.join(sorted(list(set([cache_entry.observatory for cache_entry in cache_entries])))) + split_description = cache_entries[0].description.split('_') + min_bin = [x for x in split_description[:2] if x.isdigit()] + max_bin = [x for x in cache_entries[-1].description.split('_')[:2] if x.isdigit()] + seg = segments.segmentlist(cache_entry.segment for cache_entry in cache_entries).extent() + if min_bin: + min_bin = min_bin[0] + if max_bin: + max_bin = max_bin[-1] + if min_bin and (min_bin == max_bin or not max_bin): + # All files from same bin, thus segments may be different. + # Note that this assumes that if the last file in the cache + # does not start with a number that every file in the cache is + # from the same bin, an example of this is the cache file + # generated for gstlal_inspiral_calc_likelihood, which contains + # all of the DIST_STATS files from a given background bin and + # then CREATE_PRIOR_DIST_STATS files which are not generated + # for specific bins + return T050017_filename(observatories, cache_entries[0].description, seg, extension, path = path) + elif min_bin and max_bin and min_bin != max_bin: + if split_description[1].isdigit(): + description_base = split_description[2:] + else: + description_base = split_description[1:] + # Files from different bins, thus segments must be same + return T050017_filename(observatories, '_'.join([min_bin, max_bin] + description_base), seg, extension, path = path) + else: + print >>sys.stderr, "ERROR: first and last file of cache file do not match known pattern, cannot name group file under T050017 convention. \nFile 1: %s\nFile 2: %s" % (cache_entries[0].path, cache_entries[-1].path) + raise ValueError + + +if __name__ == "__main__": + import doctest + doctest.testmod()