diff --git a/gstlal-ugly/share/construct_skymap_test_dag b/gstlal-ugly/share/construct_skymap_test_dag index c25b4be4e82ee858985cda0237d80ea4c5f2cee8..aa60a8a351e766dc9f5c92470f55df45c8a60717 100755 --- a/gstlal-ugly/share/construct_skymap_test_dag +++ b/gstlal-ugly/share/construct_skymap_test_dag @@ -2,15 +2,18 @@ ''' ./construct_skymap_test_dag path/to/injection/database path/to/tmp/space max_number_inspiral_jobs ''' -inj_db = sys.argv[1] -tmp_space = sys.argv[2] -num_inspiral_jobs = int(sys.argv[3]) import sqlite3 import sys import os +import glob +from ligo.lw import ligolw +from ligo.lw import lsctables +from ligo.lw import utils as ligolw_utils +from ligo.lw.utils import process as ligolw_process from ligo.lw import dbtables +from gstlal import dagparts # copied from gstlal_inspiral_plotsummary def create_sim_coinc_view(connection): @@ -104,9 +107,10 @@ CREATE TEMPORARY TABLE sim_sngl_far AS SELECT - sim_inspiral.*, - sngl_inspiral.*, - sim_id_combined_far.far + sngl_inspiral.process_id AS pid, + sngl_inspiral.Gamma1 AS Gamma1, + sim_id_combined_far.far AS far, + sim_inspiral.* FROM sim_inspiral JOIN sim_id_sngl_id ON ( @@ -133,71 +137,160 @@ connection = sqlite3.connect(working_filename) create_sim_coinc_view(connection) found_inj_bankid_param = {} - -for process_id, bank_id in connection.cursor().execute(""" +sim_row = {} +xmldoc = dbtables.get_xml(connection) +sim_inspiral_table = lsctables.SimInspiralTable.get_table(xmldoc) +for record in connection.cursor().execute(""" SELECT - "process_id:1", Gamma1 + * FROM sim_sngl_far WHERE far <= 3.86e-7 ORDER BY far ASC -"""): +LIMIT ? +""", (int(num_inspiral_jobs),)): + process_id = record[0] + bank_id = record[1] + far = record[2] + simrow = record[3:] try: - found_inj_bankid_param["%s_%s" % (bank_id, process_id)] += 1 + found_inj_bankid_param[(bank_id, process_id)] += 1 except KeyError: - found_inj_bankid_param["%s_%s" % (bank_id, process_id)] = 1 - -inspiral_jobs_sorted_by_most_found_injections = [("%04d" % int(tup[0].split("_")[0].split(".")[0]), tup[0].split("_")[1]) for tup in sorted(found_inj_bankid_param.items(), key = lambda t: t[1], reverse = True)] -dagstr = ['JOBSTATE_LOG logs/itacac_skymap_test.jobstate.log'] -subargstr = [] -for job_id, (bankid, process_id) in enumerate(inspiral_jobs_sorted_by_most_found_injections[:num_inspiral_jobs], start=1): - dagstr.append("JOB gstlal_inspiral_%04x gstlal_inspiral.sub" % int(job_id)) - dagstr.append("RETRY gstlal_inspiral_%04x 3" % int(job_id)) - command_line_args = [] - gates = [] + found_inj_bankid_param[(bank_id, process_id)] = 1 + sim_row.setdefault((bank_id, process_id), []).append(sim_inspiral_table.row_from_cols(simrow)) + +master_opts_dict = { + "gps-start-time":None, + "gps-end-time": None, + "psd-fft-length": 32, + "likelihood-snapshot-interval": 100000.0, + "track-psd": "", + "min-instruments": None, + "gracedb-far-threshold": 1e-6, + "gracedb-service-url": "file://%s/gracedb" % os.getcwd(), + "ht-gate-threshold": 50.0, + "veto-segments-name": "vetoes", + "fir-stride": 0.25, + "gracedb-group": "CBC", + "coincidence-threshold": 0.005, + "control-peak-time": 0, + "gracedb-pipeline": "gstlal", + "data-source": None, + "frame-segments-name": None, + "tmp-space": None, + "gracedb-search": "AllSky", + "channel-name": None, + "singles-threshold": "inf", + "verbose": "" +} + +master_input_dict = { + "reference-psd": None, + "svd-bank": None, # FIXME THIS ONE IS TRICKY + "ranking-stat-pdf": "%s/post_marginalized_likelihood.xml.gz" % analysis_dir, + "ranking-stat-input": None, # FIXME THIS ONE IS TRICKY + "veto-segments-file": None, + "frame-segments-file": None, + "frame-cache": None, + "time-slide-file": None, + "injections": None, # FIXME make this just a single injection with the correct parameters + } + +master_output_dict = { + "ranking-stat-output": "not_used.xml.gz", + "zerolag-rankingstat-pdf": "notused2.xml.gz", + "output": None, +} + +try: + os.mkdir("logs") +except: + pass +dag = dagparts.DAG("trigger_pipe") + +gstlalInspiralInjJob = dagparts.DAGJob("gstlal_inspiral", + tag_base="gstlal_inspiral_inj", + condor_commands = {"request_memory":"5gb", + "request_cpus":"2", + "want_graceful_removal":"True", + "kill_sig":"15"} + ) + +def updatedict(x, y): + for k in x: + if x[k] is None: + try: + x[k] = y[k] + except KeyError as e: + pass + +def fixrelpath(x, ys): + for y in ys: + x[y] = "%s/%s" % (analysis_dir, x[y][0]) + +def new_inj_file(rows, output): + xmldoc = ligolw.Document() + lw = xmldoc.appendChild(ligolw.LIGO_LW()) + sim_inspiral_table = lsctables.New(lsctables.SimInspiralTable) + lw.appendChild(sim_inspiral_table) + for row in rows: + sim_inspiral_table.append(row) + ligolw_utils.write_filename(xmldoc, output, gz = output.endswith('gz')) + + +try: + os.mkdir("inj_files") +except OSError: + pass + +try: + os.mkdir("lloid_files") +except OSError: + pass + +for job_id, (bankid, process_id) in enumerate(found_inj_bankid_param, start=1): # FIXME Need to add option for dist stats output + print "\n++ job_id: %s ++\n" % job_id + job_dict = {} for param, value in connection.cursor().execute("SELECT param, value FROM process_params WHERE process_id == ?", (process_id,)): - if job_id == 1: - subargstr.append("%s $(%s)" % (str(param), str(param).replace("--", "macro").replace("-", ""))) - param = str(param).replace("--", "macro").replace("-", "") - value = str(value) - if param in ("macroframecache", "macroframesegmentsfile", "macroreferencepsd", "macrotimeslidefile", "macrovetosegmentsfile", "macroinjections"): - command_line_args.append("=".join([param, '"'"%s"'"' % os.path.join(analysis_dir, value)])) - elif param == "macrooutputcache": - fname = os.path.basename(value).replace(value.split("-")[1][:9], bankid).replace(".cache", ".xml.gz") - marg_dist_stats_fname = '%s-%s_MARG_DIST_STATS-%s-%s.xml.gz' % (fname.split("-")[0], bankid, os.path.basename(inj_db).split("-")[-2], os.path.basename(inj_db).split("-")[-1].split(".")[0]) - command_line_args.append('macrooutput="'"%s"'"' % os.path.join("output", fname)) - command_line_args.append('macrorankingstatinput="'"%s"'"' % os.path.join(os.path.join(analysis_dir, "gstlal_inspiral_marginalize_likelihood"), marg_dist_stats_fname)) - command_line_args.append('macrorankingstatoutput="'"%s"'"' % os.path.join("output", "unused.xml.gz")) - elif param == "macrosvdbankcache": - value = os.path.basename(value) - bankstrlist = [] - for i in xrange(len(value.split("-")[0]) / 2): - ifo = value.split("-")[0][2*i:2*i+2] - bankstrlist.append("%s:%s" % (ifo, os.path.join(os.path.join(analysis_dir, "gstlal_svd_bank"), "-".join([ifo, "%s_SVD" % bankid, "-".join(value.split("-")[2:])]).replace(value.split("-")[1][:9], bankid).replace(".cache", ".xml.gz")))) - command_line_args.append('macrosvdbank="'"%s"'"' % ",".join(bankstrlist)) - elif param == "macrohtgatethreshold": - gates.append(value) - if len(gates) == 10: - command_line_args.append("=".join([param, '"'"%s"'"' % gates[int(bankid[-1])]])) - elif value == "None": - command_line_args.append('%s="'""'"' % param) - else: - command_line_args.append("=".join([param, '"'"%s"'"' % value])) - command_line_args.append('macrolikelihoodsnapshotinterval="'"10000000"'"') - command_line_args.append('macrorankingstatpdf="'"%s"'"' % os.path.join(analysis_dir, "post_marginalized_likelihood.xml.gz")) - command_line_args.append('macrogracedbfarthreshold="'"%f"'"' % ( 1./(30*86400))) - if job_id == 1: - subargstr.append("--likelihood-snapshot-interval $(macrolikelihoodsnapshotinterval)") - subargstr.append("--ranking-stat-pdf $(macrorankingstatpdf)") - subargstr.append("--gracedb-far-threshold $(macrogracedbfarthreshold)") - - dagstr.append("VARS gstlal_inspiral_%04x %s" % (int(job_id), " ".join(command_line_args))) - -with open("gstlal_inspiral.sub", "w") as f: - f.write("\n".join(["universe = vanilla", "executable = /home/gstlalcbc/engineering/14/code/master_icc_190225/opt/bin/gstlal_inspiral", 'arguments = "'"%s"'"' % " ".join(subargstr), "want_graceful_removal = True", "accounting_group = ligo.dev.o3.cbc.uber.gstlaloffline", "request_memory = 5GB", "accounting_group_user = cody.messick", "getenv = True", "environment = GST_REGISTRY_UPDATE=no;", "request_cpus = 2", "kill_sig = 15", "log = /local/gstlalcbc/skymap_tests", "error = logs/$(macronodename)-$(cluster)-$(process).err", "output = logs/$(macronodename)-$(cluster)-$(process).out", "notification = never", 'Requirements = regexp("'"Intel.*v[3-5]"'", TARGET.cpuinfo_model_name)', "queue 1"])) - -with open("itacac_skymap_test.dag", "w") as f: - f.write("\n".join(dagstr)) + job_dict.setdefault(param.replace("--",""), []).append(value) + this_opts_dict = master_opts_dict.copy() + updatedict(this_opts_dict, job_dict) + this_input_dict = master_input_dict.copy() + updatedict(this_input_dict, job_dict) + this_output_dict = master_output_dict.copy() + updatedict(this_output_dict, job_dict) + + # FIX some stuff + fixrelpath(this_input_dict, ("reference-psd", "frame-cache", "time-slide-file", "veto-segments-file", "frame-segments-file")) + + # make a custom injection file + inj_file_name = "inj_files/%d_%d_%d_inj.xml.gz" % (job_id, bankid, process_id) + new_inj_file(sim_row[(bank_id, process_id)], inj_file_name) + this_input_dict["injections"] = inj_file_name + + # FIXME hacks for the svd + instruments = [x.split("=")[0] for x in this_opts_dict["channel-name"]] + banks = ["%s:%s" % (ifo, glob.glob("%s/gstlal_svd_bank/%s-%04d_SVD*" % (analysis_dir, ifo, bankid))[0]) for ifo in instruments] + this_input_dict["svd-bank"] = ",".join(banks) + + # FIXME don't hardcode H1L1V1 + ranking_stat_pdf = glob.glob("%s/gstlal_inspiral_marginalize_likelihood/H1L1V1-%04d_MARG_DIST_STATS*" % (analysis_dir, bankid))[0] + this_input_dict["ranking-stat-input"] = ranking_stat_pdf + + # just name the output the same as the input + try: + os.mkdir("lloid_files/%d_%d_%d" % (job_id, bankid, process_id)) + except OSError: + pass + + output_file_name = "lloid_files/%d_%d_%d/%d_%d_%d_lloid.xml.gz" % (job_id, bankid, process_id, job_id, bankid, process_id) + this_output_dict["output"] = output_file_name + + dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = [], opts = this_opts_dict, input_files = this_input_dict, output_files = this_output_dict) + +dag.write_sub_files() +dag.write_dag() +dag.write_script()