From 2188337fffd058e904775ffc110504049d38c37e Mon Sep 17 00:00:00 2001 From: Chad Hanna <chad.hanna@ligo.org> Date: Mon, 4 Nov 2019 20:54:07 +0000 Subject: [PATCH] broken (cherry picked from commit a4b2ce59fdb0a348c26ecf684cb7c623a0baa025) --- .../bin/gstlal_inspiral_coinc_extractor_dag | 498 ++++++++++++++++++ 1 file changed, 498 insertions(+) create mode 100755 gstlal-ugly/bin/gstlal_inspiral_coinc_extractor_dag diff --git a/gstlal-ugly/bin/gstlal_inspiral_coinc_extractor_dag b/gstlal-ugly/bin/gstlal_inspiral_coinc_extractor_dag new file mode 100755 index 0000000000..c5fe3682a1 --- /dev/null +++ b/gstlal-ugly/bin/gstlal_inspiral_coinc_extractor_dag @@ -0,0 +1,498 @@ +#!/usr/bin/env python +''' +gstlal_inspiral_coinc_extractor_dag --injection-database=path/to/injection/database --zerolag-database=path/to/zerolag/database --tmp-space=path/to/tmp/space --far-threshold=1e-6 --max-number-inspiral-jobs=max_number_inspiral_jobs +''' + +from optparse import OptionParser +import sqlite3 +import os +import glob + +from ligo.lw import ligolw +from ligo.lw import lsctables +from ligo.lw import utils as ligolw_utils +from ligo.lw.utils import process as ligolw_process +from ligo.lw import dbtables +from gstlal import dagparts + +def parse_command_line(): + parser = OptionParser() + parser.add_option("-i", "--injection-database", metavar = "filename", help = "Path to the injection database.") + parser.add_option("-z", "--zerolag-database", metavar = "filename", help = "Path to the zerolag database.") + parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to temp space. Defaults to _CONDOR_SCRATCH_DIR .") + parser.add_option("-f", "--far-threshold", type = "float", default = 3.86e-7, help = "Threshold for rerunning an injection job to produce the time series.") + parser.add_option("-m", "--max-number-inspiral-jobs", type = "int", default = 10000, help = "Maximum number of inspiral jobs to queue. If this option is not provided, the program defaults to the top 10000 that pass the imposed FAR threshold.") + + options, filenames = parser.parse_args() + + return options, filenames + +options, filenames = parse_command_line() + +if not (options.injection_database or options.zerolag_database): + raise ValueError("You must provide a database.") + +if options.injection_database and options.zerolag_database: + raise ValueError("You must provide either an injection or a zerolag a database...just make up your mind!!") + +if not options.tmp_space: + if '_CONDOR_SCRATCH_DIR' in os.environ: + options.tmp_space = os.environ['_CONDOR_SCRATCH_DIR'] + else: + options.tmp_space = os.environ['TMPDIR'] + + + +# copied from gstlal_inspiral_plotsummary +def create_sim_coinc_view(connection): + """ + Construct a sim_inspiral --> best matching coinc_event mapping. + Only injections that match at least one coinc get an entry in this + table. + """ + # + # the log likelihood ratio stored in the likelihood column of the + # coinc_event table is the ranking statistic. the "best match" is + # the coinc with the highest value in this column. although it has + # not been true in the past, there is now a one-to-one relationship + # between the value of this ranking statistic and false-alarm rate, + # therefore it is OK to order by log likelihood ratio and then, + # later, impose a "detection" threshold based on false-alarm rate. + # + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sim_coinc_map_helper +AS + SELECT a.event_id as sid, + coinc_event.coinc_event_id as cid, + coinc_event.likelihood as lr, + coinc_event.nevents as nevents, + coinc_event.instruments as instruments + FROM coinc_event_map as a + JOIN coinc_event_map AS b ON (b.coinc_event_id == a.coinc_event_id) + JOIN coinc_event ON (coinc_event.coinc_event_id == b.event_id) + WHERE a.table_name == "sim_inspiral" + AND b.table_name == "coinc_event" + AND NOT EXISTS (SELECT * FROM time_slide WHERE time_slide.time_slide_id == coinc_event.time_slide_id AND time_slide.offset != 0); + """) + + connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_coinc_map_helper_index ON sim_coinc_map_helper (sid, cid);") + +# +# FIXME NOTE FIXME this selects triple times with no more than two ifos to test the itacac sub threshold stuff +# + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sim_coinc_map +AS + SELECT + sim_inspiral.simulation_id AS simulation_id, + ( + SELECT + cid + FROM + sim_coinc_map_helper + WHERE + sid = simulation_id + ORDER BY + lr + DESC + LIMIT 1 + ) AS coinc_event_id + FROM + sim_inspiral + WHERE + coinc_event_id IS NOT NULL; + + """) + + connection.cursor().execute("DROP INDEX sim_coinc_map_helper_index;") + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sim_id_combined_far +AS + SELECT + coinc_inspiral.combined_far AS far, sim_coinc_map.simulation_id AS sim_id + FROM + sim_coinc_map + JOIN coinc_inspiral ON ( coinc_inspiral.coinc_event_id == sim_coinc_map.coinc_event_id ) + """) + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sim_id_sngl_id + AS + SELECT + sim_coinc_map.simulation_id AS sim_id, sngl_inspiral.event_id AS sngl_id + FROM + sim_coinc_map + JOIN coinc_event_map as mapA ON ( mapA.coinc_event_id == sim_coinc_map.coinc_event_id ) + JOIN sngl_inspiral ON ( sngl_inspiral.event_id == mapA.event_id ) + """) + + connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_combined_far_index ON sim_id_combined_far (far, sim_id)") + connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_sngl_id_index ON sim_id_sngl_id (sim_id, sngl_id)") + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sim_sngl_far + AS + SELECT + sngl_inspiral.process_id AS pid, + sngl_inspiral.Gamma1 AS Gamma1, + sim_id_combined_far.far AS far, + sim_inspiral.simulation_id AS simulation_id, + sim_inspiral.* + FROM + sim_inspiral + JOIN sim_id_sngl_id ON ( + sim_inspiral.simulation_id == sim_id_sngl_id.sim_id + ) + JOIN sngl_inspiral ON ( + sngl_inspiral.event_id == sim_id_sngl_id.sngl_id + ) + JOIN sim_id_combined_far ON ( + sim_id_combined_far.sim_id == sim_id_sngl_id.sim_id + ) + """) + + connection.cursor().execute("DROP INDEX sim_id_combined_far_index") + connection.cursor().execute("DROP INDEX sim_id_sngl_id_index") + +def create_coinc_view(connection): + """ + Construct a sngl_inspiral --> best matching coinc_event mapping. + Only triggers that match at least one coinc get an entry in this + table. + """ + # + # the log likelihood ratio stored in the likelihood column of the + # coinc_event table is the ranking statistic. the "best match" is + # the coinc with the highest value in this column. although it has + # not been true in the past, there is now a one-to-one relationship + # between the value of this ranking statistic and false-alarm rate, + # therefore it is OK to order by log likelihood ratio and then, + # later, impose a "detection" threshold based on false-alarm rate. + # + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sngl_coinc_map_helper +AS + SELECT a.event_id as sid, + coinc_event.coinc_event_id as cid, + coinc_event.likelihood as lr, + coinc_event.nevents as nevents, + coinc_event.instruments as instruments + FROM coinc_event_map as a + JOIN coinc_event_map AS b ON (b.coinc_event_id == a.coinc_event_id) + JOIN coinc_event ON (coinc_event.coinc_event_id == b.event_id) + WHERE a.table_name == "sngl_inspiral" + AND NOT EXISTS (SELECT * FROM time_slide WHERE time_slide.time_slide_id == coinc_event.time_slide_id AND time_slide.offset != 0); + """) + + connection.cursor().execute("CREATE INDEX IF NOT EXISTS sngl_coinc_map_helper_index ON sngl_coinc_map_helper (sid, cid);") + +# +# FIXME NOTE FIXME this selects triple times with no more than two ifos to test the itacac sub threshold stuff +# + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sngl_coinc_map +AS + SELECT + sngl_inspiral.event_id AS event_id, + ( + SELECT + cid + FROM + sngl_coinc_map_helper + WHERE + sid = event_id + ORDER BY + lr + DESC + LIMIT 1 + ) AS coinc_event_id + FROM + sngl_inspiral + WHERE + coinc_event_id IS NOT NULL; + + """) + + connection.cursor().execute("DROP INDEX sngl_coinc_map_helper_index;") + + connection.cursor().execute(""" +CREATE TEMPORARY TABLE + sngl_far +AS + SELECT + sngl_inspiral.process_id AS pid, + sngl_inspiral.Gamma1 AS Gamma1, + coinc_inspiral.combined_far AS far, + sngl_inspiral.event_id AS event_id, + sngl_inspiral.* + + FROM + sngl_coinc_map + JOIN coinc_inspiral ON ( coinc_inspiral.coinc_event_id == sngl_coinc_map.coinc_event_id ) + JOIN sngl_inspiral ON ( sngl_inspiral.event_id == sngl_coinc_map.event_id ) + """) + +if options.injection_database: + db = options.injection_database +else: + db = options.zerolag_database + +tmp_space = options.tmp_space +num_inspiral_jobs = options.max_number_inspiral_jobs +analysis_dir = os.path.dirname(db) + +working_filename = dbtables.get_connection_filename(db, tmp_path = tmp_space, verbose = True) +connection = sqlite3.connect(working_filename) + +if options.injection_database: + create_sim_coinc_view(connection) + s_row = {} + xmldoc = dbtables.get_xml(connection) + inspiral_table = lsctables.SimInspiralTable.get_table(xmldoc) + for record in connection.cursor().execute(""" + SELECT + * + FROM + sim_sngl_far + WHERE + far <= %e + ORDER BY + far ASC + LIMIT ? + """ % (options.far_threshold,), (int(num_inspiral_jobs),)): + process_id = record[0] + bank_id = record[1] + far = record[2] + sid = record[3] + srow = record[4:] + s_row[(bank_id, process_id, sid)] = inspiral_table.row_from_cols(srow) + +elif options.zerolag_database: + create_coinc_view(connection) + s_row = {} + xmldoc = dbtables.get_xml(connection) + inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc) + for record in connection.cursor().execute(""" + SELECT + * + FROM + sngl_far + WHERE + far <= %e + ORDER BY + far ASC + LIMIT ? + """ % (options.far_threshold,), (int(num_inspiral_jobs),)): + process_id = record[0] + print "process id", process_id + bank_id = record[1] + far = record[2] + sid = record[3] + srow = record[4:] + s_row[(bank_id, process_id, sid)] = inspiral_table.row_from_cols(srow) + +master_opts_dict = { + "gps-start-time":None, + "gps-end-time": None, + "psd-fft-length": None, + "likelihood-snapshot-interval": 100000.0, + "track-psd": "", + "min-instruments": None, + "gracedb-far-threshold": 1e-6, + "gracedb-service-url": None, + "ht-gate-threshold": 50.0, + "veto-segments-name": "vetoes", + "fir-stride": 0.25, + "gracedb-group": "CBC", + "coincidence-threshold": 0.005, + "control-peak-time": 0, + "gracedb-pipeline": "gstlal", + "data-source": None, + "frame-segments-name": None, + "tmp-space": None, + "gracedb-search": "AllSky", + "channel-name": None, + "singles-threshold": "inf", + "local-frame-caching":"", + "verbose": "" +} + +master_input_dict = { + "reference-psd": None, + "svd-bank": None, # FIXME THIS ONE IS TRICKY + "ranking-stat-pdf": "%s/post_marginalized_likelihood.xml.gz" % analysis_dir, + "ranking-stat-input": None, # FIXME THIS ONE IS TRICKY + "veto-segments-file": None, + "frame-segments-file": None, + "frame-cache": None, + "time-slide-file": None, + "injections": None, # FIXME make this just a single injection with the correct parameters + } + +master_output_dict = { + "ranking-stat-output": "not_used.xml.gz", + "zerolag-rankingstat-pdf": "notused2.xml.gz", + "output": None, +} + +try: + os.mkdir("logs") +except: + pass +dag = dagparts.DAG("trigger_pipe") + +gstlalInspiralInjJob = dagparts.DAGJob("gstlal_inspiral", + tag_base="gstlal_inspiral_inj", + condor_commands = {"request_memory":"5gb", + "request_cpus":"2", + "want_graceful_removal":"True", + "kill_sig":"15"} + ) + +gstlalInspiralJob = dagparts.DAGJob("gstlal_inspiral", + tag_base="gstlal_inspiral", + condor_commands = {"request_memory":"5gb", + "request_cpus":"2", + "want_graceful_removal":"True", + "kill_sig":"15"} + ) + +noIlwdcharJob = dagparts.DAGJob("ligolw_no_ilwdchar") +bashJob = dagparts.DAGJob("bash") + + +def updatedict(x, y): + for k in x: + if x[k] is None: + try: + x[k] = y[k] + except KeyError as e: + pass + +def fixrelpath(x, ys): + for y in ys: + x[y] = "%s/%s" % (analysis_dir, x[y][0]) + +def new_inj_file(row, output): + xmldoc = ligolw.Document() + lw = xmldoc.appendChild(ligolw.LIGO_LW()) + sim_inspiral_table = lsctables.New(lsctables.SimInspiralTable) + lw.appendChild(sim_inspiral_table) + sim_inspiral_table.append(row) + ligolw_utils.write_filename(xmldoc, output, gz = output.endswith('gz')) + +def new_file(row, output): + xmldoc = ligolw.Document() + lw = xmldoc.appendChild(ligolw.LIGO_LW()) + inspiral_table = lsctables.New(lsctables.SnglInspiralTable) + lw.appendChild(inspiral_table) + inspiral_table.append(row) + ligolw_utils.write_filename(xmldoc, output, gz = output.endswith('gz')) + + +try: + os.mkdir("inj_files") +except OSError: + pass + +try: + os.mkdir("lloid_files") +except OSError: + pass + +mass_model = analysis_dir + '/gstlal_inspiral_mass_model' +try: + os.mkdir("gstlal_inspiral_mass_model") + os.system('cp -r %s .' % (mass_model,)) +except OSError: + pass + +bayesdir = "bayestar_input" +try: + os.mkdir(bayesdir) +except OSError: + pass + + +f = open("process.sh", "w") +f.write("""#!/usr/bin/bash +ID=$1 +FILE=$(gstlal_inspiral_best_coinc_file lloid_files/${ID}/*CBC_AllSky-*-0.xml) +ligolw_no_ilwdchar ${FILE} +mkdir -p bayestar_input/${ID}/ +gstlal_ligolw_add_without_reassign ${FILE} inj_files/${ID}_inj.xml.gz --output bayestar_input/${ID}/${ID}_event.xml.gz +lalapps_inspinjfind --time-window 0.9 bayestar_input/${ID}/${ID}_event.xml.gz +gstlal_ilwdify bayestar_input/${ID}/${ID}_event.xml.gz +""") +f.close() + +for job_id, (bankid, process_id, sid) in enumerate(s_row, start=1): + # FIXME Need to add option for dist stats output + print "++ job_id: %s ++" % job_id + print dir(s_row[(bankid, process_id, sid)]) + job_dict = {} + for param, value in connection.cursor().execute("SELECT param, value FROM process_params WHERE process_id == ?", (process_id,)): + job_dict.setdefault(param.replace("--",""), []).append(value) + this_opts_dict = master_opts_dict.copy() + updatedict(this_opts_dict, job_dict) + this_input_dict = master_input_dict.copy() + updatedict(this_input_dict, job_dict) + this_output_dict = master_output_dict.copy() + updatedict(this_output_dict, job_dict) + + # FIX some stuff + fixrelpath(this_input_dict, ("reference-psd", "frame-cache", "time-slide-file", "veto-segments-file", "frame-segments-file")) + + # make a custom injection file + if options.injection_database: + inj_file_name = "inj_files/%d_%d_%d_inj.xml.gz" % (job_id, bankid, process_id) + new_inj_file(s_row[(bankid, process_id, sid)], inj_file_name) + this_input_dict["injections"] = inj_file_name + else: + coal_time = s_row[(bankid, process_id, sid)].end_time + this_input_dict["reconstruction-segment"] = "%d:%d" % (coal_time - 3, coal_time + 3) + # FIXME hacks for the svd + instruments = [x.split("=")[0] for x in this_opts_dict["channel-name"]] + banks = ["%s:%s" % (ifo, glob.glob("%s/gstlal_svd_bank/%s-%04d_SVD*" % (analysis_dir, ifo, bankid))[0]) for ifo in instruments] + this_input_dict["svd-bank"] = ",".join(banks) + + # FIXME don't hardcode H1L1V1 + ranking_stat_pdf = glob.glob("%s/gstlal_inspiral_marginalize_likelihood/H1L1V1-%04d_MARG_DIST_STATS*" % (analysis_dir, bankid))[0] + this_input_dict["ranking-stat-input"] = ranking_stat_pdf + + # just name the output the same as the input + outdir = "lloid_files/%d_%d_%d" % (job_id, bankid, process_id) + try: + os.mkdir(outdir) + except OSError: + pass + + output_file_name = "%s/%d_%d_%d_lloid.xml.gz" % (outdir, job_id, bankid, process_id) + this_output_dict["output"] = output_file_name + + this_opts_dict["gracedb-service-url"] = "file://%s/%s" % (os.getcwd(), outdir) + + if options.injection_database: + node = dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = [], opts = this_opts_dict, input_files = this_input_dict, output_files = this_output_dict) + if options.zerolag_database: + node = dagparts.DAGNode(gstlalInspiralJob, dag, parent_nodes = [], opts = this_opts_dict, input_files = this_input_dict, output_files = this_output_dict) + + # all the remaining post processing + node = dagparts.DAGNode(bashJob, dag, parent_nodes = [node], opts = {"":["process.sh", "%d_%d_%d" % (job_id, bankid, process_id)]}) + + +dag.write_sub_files() +dag.write_dag() +dag.write_script() -- GitLab