Skip to content
Snippets Groups Projects
Commit e372a0d1 authored by Cody Messick's avatar Cody Messick
Browse files

construct_skymap_test_dag: first commit of script that will construct a dag

running offline inspiral jobs with gracedb uploads. The command line arguments
are constructed from a the process params table in an injection results
database
parent 1f9a3ff7
No related branches found
No related tags found
No related merge requests found
Pipeline #51740 failed
#!/usr/bin/env python
'''
./construct_skymap_test_dag path/to/injection/database path/to/tmp/space max_number_inspiral_jobs
'''
inj_db = sys.argv[1]
tmp_space = sys.argv[2]
num_inspiral_jobs = int(sys.argv[3])
import sqlite3
import sys
import os
from ligo.lw import dbtables
# copied from gstlal_inspiral_plotsummary
def create_sim_coinc_view(connection):
"""
Construct a sim_inspiral --> best matching coinc_event mapping.
Only injections that match at least one coinc get an entry in this
table.
"""
#
# the log likelihood ratio stored in the likelihood column of the
# coinc_event table is the ranking statistic. the "best match" is
# the coinc with the highest value in this column. although it has
# not been true in the past, there is now a one-to-one relationship
# between the value of this ranking statistic and false-alarm rate,
# therefore it is OK to order by log likelihood ratio and then,
# later, impose a "detection" threshold based on false-alarm rate.
#
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_coinc_map_helper
AS
SELECT a.event_id as sid,
coinc_event.coinc_event_id as cid,
coinc_event.likelihood as lr
FROM coinc_event_map as a
JOIN coinc_event_map AS b ON (b.coinc_event_id == a.coinc_event_id)
JOIN coinc_event ON (coinc_event.coinc_event_id == b.event_id)
WHERE a.table_name == "sim_inspiral"
AND b.table_name == "coinc_event"
AND NOT EXISTS (SELECT * FROM time_slide WHERE time_slide.time_slide_id == coinc_event.time_slide_id AND time_slide.offset != 0);
""")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_coinc_map_helper_index ON sim_coinc_map_helper (sid, cid);")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_coinc_map
AS
SELECT
sim_inspiral.simulation_id AS simulation_id,
(
SELECT
cid
FROM
sim_coinc_map_helper
WHERE
sid = simulation_id
ORDER BY
lr
DESC
LIMIT 1
) AS coinc_event_id
FROM
sim_inspiral
WHERE
coinc_event_id IS NOT NULL;
""")
connection.cursor().execute("DROP INDEX sim_coinc_map_helper_index;")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_id_combined_far
AS
SELECT
coinc_inspiral.combined_far AS far, sim_coinc_map.simulation_id AS sim_id
FROM
sim_coinc_map
JOIN coinc_inspiral ON ( coinc_inspiral.coinc_event_id == sim_coinc_map.coinc_event_id )
""")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_id_sngl_id
AS
SELECT
sim_coinc_map.simulation_id AS sim_id, sngl_inspiral.event_id AS sngl_id
FROM
sim_coinc_map
JOIN coinc_event_map as mapA ON ( mapA.coinc_event_id == sim_coinc_map.coinc_event_id )
JOIN sngl_inspiral ON ( sngl_inspiral.event_id == mapA.event_id )
""")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_combined_far_index ON sim_id_combined_far (far, sim_id)")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_sngl_id_index ON sim_id_sngl_id (sim_id, sngl_id)")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_sngl_far
AS
SELECT
sim_inspiral.*,
sngl_inspiral.*,
sim_id_combined_far.far
FROM
sim_inspiral
JOIN sim_id_sngl_id ON (
sim_inspiral.simulation_id == sim_id_sngl_id.sim_id
)
JOIN sngl_inspiral ON (
sngl_inspiral.event_id == sim_id_sngl_id.sngl_id
)
JOIN sim_id_combined_far ON (
sim_id_combined_far.sim_id == sim_id_sngl_id.sim_id
)
""")
connection.cursor().execute("DROP INDEX sim_id_combined_far_index")
connection.cursor().execute("DROP INDEX sim_id_sngl_id_index")
inj_db = sys.argv[1]
tmp_space = sys.argv[2]
num_inspiral_jobs = int(sys.argv[3])
analysis_dir = os.path.dirname(inj_db)
working_filename = dbtables.get_connection_filename(inj_db, tmp_path = tmp_space, verbose = True)
connection = sqlite3.connect(working_filename)
create_sim_coinc_view(connection)
found_inj_bankid_param = {}
for process_id, bank_id in connection.cursor().execute("""
SELECT
"process_id:1", Gamma1
FROM
sim_sngl_far
WHERE
far <= 3.86e-7
ORDER BY
far ASC
"""):
try:
found_inj_bankid_param["%s_%s" % (bank_id, process_id)] += 1
except KeyError:
found_inj_bankid_param["%s_%s" % (bank_id, process_id)] = 1
inspiral_jobs_sorted_by_most_found_injections = [("%04d" % int(tup[0].split("_")[0].split(".")[0]), tup[0].split("_")[1]) for tup in sorted(found_inj_bankid_param.items(), key = lambda t: t[1], reverse = True)]
dagstr = ['JOBSTATE_LOG logs/itacac_skymap_test.jobstate.log']
subargstr = []
for job_id, (bankid, process_id) in enumerate(inspiral_jobs_sorted_by_most_found_injections[:num_inspiral_jobs], start=1):
dagstr.append("JOB gstlal_inspiral_%04x gstlal_inspiral.sub" % int(job_id))
dagstr.append("RETRY gstlal_inspiral_%04x 3" % int(job_id))
command_line_args = []
gates = []
# FIXME Need to add option for dist stats output
for param, value in connection.cursor().execute("SELECT param, value FROM process_params WHERE process_id == ?", (process_id,)):
if job_id == 1:
subargstr.append("%s $(%s)" % (str(param), str(param).replace("--", "macro").replace("-", "")))
param = str(param).replace("--", "macro").replace("-", "")
value = str(value)
if param in ("macroframecache", "macroframesegmentsfile", "macroreferencepsd", "macrotimeslidefile", "macrovetosegmentsfile", "macroinjections"):
command_line_args.append("=".join([param, '"'"%s"'"' % os.path.join(analysis_dir, value)]))
elif param == "macrooutputcache":
fname = os.path.basename(value).replace(value.split("-")[1][:9], bankid).replace(".cache", ".xml.gz")
marg_dist_stats_fname = '%s-%s_MARG_DIST_STATS-%s-%s.xml.gz' % (fname.split("-")[0], bankid, os.path.basename(inj_db).split("-")[-2], os.path.basename(inj_db).split("-")[-1].split(".")[0])
command_line_args.append('macrooutput="'"%s"'"' % os.path.join("output", fname))
command_line_args.append('macrorankingstatinput="'"%s"'"' % os.path.join(os.path.join(analysis_dir, "gstlal_inspiral_marginalize_likelihood"), marg_dist_stats_fname))
command_line_args.append('macrorankingstatoutput="'"%s"'"' % os.path.join("output", "unused.xml.gz"))
elif param == "macrosvdbankcache":
value = os.path.basename(value)
bankstrlist = []
for i in xrange(len(value.split("-")[0]) / 2):
ifo = value.split("-")[0][2*i:2*i+2]
bankstrlist.append("%s:%s" % (ifo, os.path.join(os.path.join(analysis_dir, "gstlal_svd_bank"), "-".join([ifo, "%s_SVD" % bankid, "-".join(value.split("-")[2:])]).replace(value.split("-")[1][:9], bankid).replace(".cache", ".xml.gz"))))
command_line_args.append('macrosvdbank="'"%s"'"' % ",".join(bankstrlist))
elif param == "macrohtgatethreshold":
gates.append(value)
if len(gates) == 10:
command_line_args.append("=".join([param, '"'"%s"'"' % gates[int(bankid[-1])]]))
elif value == "None":
command_line_args.append('%s="'""'"' % param)
else:
command_line_args.append("=".join([param, '"'"%s"'"' % value]))
command_line_args.append('macrolikelihoodsnapshotinterval="'"10000000"'"')
command_line_args.append('macrorankingstatpdf="'"%s"'"' % os.path.join(analysis_dir, "post_marginalized_likelihood.xml.gz"))
command_line_args.append('macrogracedbfarthreshold="'"%f"'"' % ( 1./(30*86400)))
if job_id == 1:
subargstr.append("--likelihood-snapshot-interval $(macrolikelihoodsnapshotinterval)")
subargstr.append("--ranking-stat-pdf $(macrorankingstatpdf)")
subargstr.append("--gracedb-far-threshold $(macrogracedbfarthreshold)")
dagstr.append("VARS gstlal_inspiral_%04x %s" % (int(job_id), " ".join(command_line_args)))
with open("gstlal_inspiral.sub", "w") as f:
f.write("\n".join(["universe = vanilla", "executable = /home/gstlalcbc/engineering/14/code/master_icc_190225/opt/bin/gstlal_inspiral", 'arguments = "'"%s"'"' % " ".join(subargstr), "want_graceful_removal = True", "accounting_group = ligo.dev.o3.cbc.uber.gstlaloffline", "request_memory = 5GB", "accounting_group_user = cody.messick", "getenv = True", "environment = GST_REGISTRY_UPDATE=no;", "request_cpus = 2", "kill_sig = 15", "log = /local/gstlalcbc/skymap_tests", "error = logs/$(macronodename)-$(cluster)-$(process).err", "output = logs/$(macronodename)-$(cluster)-$(process).out", "notification = never", 'Requirements = regexp("'"Intel.*v[3-5]"'", TARGET.cpuinfo_model_name)', "queue 1"]))
with open("itacac_skymap_test.dag", "w") as f:
f.write("\n".join(dagstr))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment