Newer
Older
#!/usr/bin/env python
'''
construct_skymap_test_dag --injection-database=path/to/injection/database --tmp-space=path/to/tmp/space --far-threshold=1e-7 --max-number-inspiral-jobs=max_number_inspiral_jobs
'''
from optparse import OptionParser
import sqlite3
import os
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from ligo.lw import dbtables
def parse_command_line():
parser = OptionParser()
parser.add_option("-i", "--injection-database", metavar = "filename", help = "Path to the injection database.")
parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to temp space. Defaults to _CONDOR_SCRATCH_DIR .")
parser.add_option("-f", "--far-threshold", type = "float", default = 3.86e-7, help = "Threshold for rerunning an injection job to produce the time series.")
parser.add_option("-m", "--max-number-inspiral-jobs", type = "int", default = 10000, help = "Maximum number of inspiral jobs to queue. If this option is not provided, the program defaults to the top 10000 that pass the imposed FAR threshold.")
options, filenames = parser.parse_args()
return options, filenames
options, filenames = parse_command_line()
if not options.injection_database:
raise ValueError("You must provide an injection database.")
if not options.tmp_space:
if '_CONDOR_SCRATCH_DIR' in os.environ:
options.tmp_space = os.environ['_CONDOR_SCRATCH_DIR']
else:
options.tmp_space = os.environ['TMPDIR']
# copied from gstlal_inspiral_plotsummary
def create_sim_coinc_view(connection):
"""
Construct a sim_inspiral --> best matching coinc_event mapping.
Only injections that match at least one coinc get an entry in this
table.
"""
#
# the log likelihood ratio stored in the likelihood column of the
# coinc_event table is the ranking statistic. the "best match" is
# the coinc with the highest value in this column. although it has
# not been true in the past, there is now a one-to-one relationship
# between the value of this ranking statistic and false-alarm rate,
# therefore it is OK to order by log likelihood ratio and then,
# later, impose a "detection" threshold based on false-alarm rate.
#
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_coinc_map_helper
AS
SELECT a.event_id as sid,
coinc_event.coinc_event_id as cid,
coinc_event.likelihood as lr,
coinc_event.nevents as nevents,
coinc_event.instruments as instruments
FROM coinc_event_map as a
JOIN coinc_event_map AS b ON (b.coinc_event_id == a.coinc_event_id)
JOIN coinc_event ON (coinc_event.coinc_event_id == b.event_id)
WHERE a.table_name == "sim_inspiral"
AND b.table_name == "coinc_event"
AND NOT EXISTS (SELECT * FROM time_slide WHERE time_slide.time_slide_id == coinc_event.time_slide_id AND time_slide.offset != 0);
""")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_coinc_map_helper_index ON sim_coinc_map_helper (sid, cid);")
#
# FIXME NOTE FIXME this selects triple times with no more than two ifos to test the itacac sub threshold stuff
#
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_coinc_map
AS
SELECT
sim_inspiral.simulation_id AS simulation_id,
(
SELECT
cid
FROM
sim_coinc_map_helper
WHERE
sid = simulation_id
ORDER BY
lr
DESC
LIMIT 1
) AS coinc_event_id
FROM
sim_inspiral
WHERE
coinc_event_id IS NOT NULL;
""")
connection.cursor().execute("DROP INDEX sim_coinc_map_helper_index;")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_id_combined_far
AS
SELECT
coinc_inspiral.combined_far AS far, sim_coinc_map.simulation_id AS sim_id
FROM
sim_coinc_map
JOIN coinc_inspiral ON ( coinc_inspiral.coinc_event_id == sim_coinc_map.coinc_event_id )
""")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_id_sngl_id
AS
SELECT
sim_coinc_map.simulation_id AS sim_id, sngl_inspiral.event_id AS sngl_id
FROM
sim_coinc_map
JOIN coinc_event_map as mapA ON ( mapA.coinc_event_id == sim_coinc_map.coinc_event_id )
JOIN sngl_inspiral ON ( sngl_inspiral.event_id == mapA.event_id )
""")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_combined_far_index ON sim_id_combined_far (far, sim_id)")
connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_sngl_id_index ON sim_id_sngl_id (sim_id, sngl_id)")
connection.cursor().execute("""
CREATE TEMPORARY TABLE
sim_sngl_far
AS
SELECT
sngl_inspiral.process_id AS pid,
sngl_inspiral.Gamma1 AS Gamma1,
sim_id_combined_far.far AS far,
sim_inspiral.simulation_id AS simulation_id,
FROM
sim_inspiral
JOIN sim_id_sngl_id ON (
sim_inspiral.simulation_id == sim_id_sngl_id.sim_id
)
JOIN sngl_inspiral ON (
sngl_inspiral.event_id == sim_id_sngl_id.sngl_id
)
JOIN sim_id_combined_far ON (
sim_id_combined_far.sim_id == sim_id_sngl_id.sim_id
)
""")
connection.cursor().execute("DROP INDEX sim_id_combined_far_index")
connection.cursor().execute("DROP INDEX sim_id_sngl_id_index")
inj_db = options.injection_database
tmp_space = options.tmp_space
num_inspiral_jobs = options.max_number_inspiral_jobs
analysis_dir = os.path.dirname(inj_db)
working_filename = dbtables.get_connection_filename(inj_db, tmp_path = tmp_space, verbose = True)
connection = sqlite3.connect(working_filename)
create_sim_coinc_view(connection)
sim_row = {}
xmldoc = dbtables.get_xml(connection)
sim_inspiral_table = lsctables.SimInspiralTable.get_table(xmldoc)
for record in connection.cursor().execute("""
SELECT
FROM
sim_sngl_far
WHERE
ORDER BY
far ASC
""" % (options.far_threshold,), (int(num_inspiral_jobs),)):
process_id = record[0]
bank_id = record[1]
far = record[2]
simid = record[3]
simrow = record[4:]
sim_row[(bank_id, process_id, simid)] = sim_inspiral_table.row_from_cols(simrow)
master_opts_dict = {
"gps-start-time":None,
"gps-end-time": None,
"psd-fft-length": 32,
"likelihood-snapshot-interval": 100000.0,
"track-psd": "",
"min-instruments": None,
"gracedb-far-threshold": 1e-6,
"ht-gate-threshold": 50.0,
"veto-segments-name": "vetoes",
"fir-stride": 0.25,
"gracedb-group": "CBC",
"coincidence-threshold": 0.005,
"control-peak-time": 0,
"gracedb-pipeline": "gstlal",
"data-source": None,
"frame-segments-name": None,
"tmp-space": None,
"gracedb-search": "AllSky",
"channel-name": None,
"singles-threshold": "inf",
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"verbose": ""
}
master_input_dict = {
"reference-psd": None,
"svd-bank": None, # FIXME THIS ONE IS TRICKY
"ranking-stat-pdf": "%s/post_marginalized_likelihood.xml.gz" % analysis_dir,
"ranking-stat-input": None, # FIXME THIS ONE IS TRICKY
"veto-segments-file": None,
"frame-segments-file": None,
"frame-cache": None,
"time-slide-file": None,
"injections": None, # FIXME make this just a single injection with the correct parameters
}
master_output_dict = {
"ranking-stat-output": "not_used.xml.gz",
"zerolag-rankingstat-pdf": "notused2.xml.gz",
"output": None,
}
try:
os.mkdir("logs")
except:
pass
dag = dagparts.DAG("trigger_pipe")
gstlalInspiralInjJob = dagparts.DAGJob("gstlal_inspiral",
tag_base="gstlal_inspiral_inj",
condor_commands = {"request_memory":"5gb",
"request_cpus":"2",
"want_graceful_removal":"True",
"kill_sig":"15"}
)
noIlwdcharJob = dagparts.DAGJob("ligolw_no_ilwdchar")
bashJob = dagparts.DAGJob("bash")
def updatedict(x, y):
for k in x:
if x[k] is None:
try:
x[k] = y[k]
except KeyError as e:
pass
def fixrelpath(x, ys):
for y in ys:
x[y] = "%s/%s" % (analysis_dir, x[y][0])
xmldoc = ligolw.Document()
lw = xmldoc.appendChild(ligolw.LIGO_LW())
sim_inspiral_table = lsctables.New(lsctables.SimInspiralTable)
lw.appendChild(sim_inspiral_table)
ligolw_utils.write_filename(xmldoc, output, gz = output.endswith('gz'))
try:
os.mkdir("inj_files")
except OSError:
pass
try:
os.mkdir("lloid_files")
except OSError:
pass
mass_model = analysis_dir + '/gstlal_inspiral_mass_model'
try:
os.mkdir("gstlal_inspiral_mass_model")
os.system('cp -r %s .' % (mass_model,))
except OSError:
pass
bayesdir = "bayestar_input"
try:
os.mkdir(bayesdir)
except OSError:
pass
f = open("process.sh", "w")
f.write("""#!/usr/bin/bash
ID=$1
FILE=$(gstlal_inspiral_best_coinc_file lloid_files/${ID}/*CBC_AllSky-*-0.xml)
ligolw_no_ilwdchar ${FILE}
mkdir -p bayestar_input/${ID}/
gstlal_ligolw_add_without_reassign ${FILE} inj_files/${ID}_inj.xml.gz --output bayestar_input/${ID}/${ID}_event.xml.gz
lalapps_inspinjfind --time-window 0.9 bayestar_input/${ID}/${ID}_event.xml.gz
gstlal_ilwdify bayestar_input/${ID}/${ID}_event.xml.gz
""")
f.close()
for job_id, (bankid, process_id, simid) in enumerate(sim_row, start=1):
# FIXME Need to add option for dist stats output
for param, value in connection.cursor().execute("SELECT param, value FROM process_params WHERE process_id == ?", (process_id,)):
job_dict.setdefault(param.replace("--",""), []).append(value)
this_opts_dict = master_opts_dict.copy()
updatedict(this_opts_dict, job_dict)
this_input_dict = master_input_dict.copy()
updatedict(this_input_dict, job_dict)
this_output_dict = master_output_dict.copy()
updatedict(this_output_dict, job_dict)
# FIX some stuff
fixrelpath(this_input_dict, ("reference-psd", "frame-cache", "time-slide-file", "veto-segments-file", "frame-segments-file"))
# make a custom injection file
inj_file_name = "inj_files/%d_%d_%d_inj.xml.gz" % (job_id, bankid, process_id)
new_inj_file(sim_row[(bankid, process_id, simid)], inj_file_name)
this_input_dict["injections"] = inj_file_name
# FIXME hacks for the svd
instruments = [x.split("=")[0] for x in this_opts_dict["channel-name"]]
banks = ["%s:%s" % (ifo, glob.glob("%s/gstlal_svd_bank/%s-%04d_SVD*" % (analysis_dir, ifo, bankid))[0]) for ifo in instruments]
this_input_dict["svd-bank"] = ",".join(banks)
# FIXME don't hardcode H1L1V1
ranking_stat_pdf = glob.glob("%s/gstlal_inspiral_marginalize_likelihood/H1L1V1-%04d_MARG_DIST_STATS*" % (analysis_dir, bankid))[0]
this_input_dict["ranking-stat-input"] = ranking_stat_pdf
# just name the output the same as the input
outdir = "lloid_files/%d_%d_%d" % (job_id, bankid, process_id)
output_file_name = "%s/%d_%d_%d_lloid.xml.gz" % (outdir, job_id, bankid, process_id)
this_output_dict["output"] = output_file_name
this_opts_dict["gracedb-service-url"] = "file://%s/%s" % (os.getcwd(), outdir)
node = dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = [], opts = this_opts_dict, input_files = this_input_dict, output_files = this_output_dict)
# all the remaining post processing
node = dagparts.DAGNode(bashJob, dag, parent_nodes = [node], opts = {"":["process.sh", "%d_%d_%d" % (job_id, bankid, process_id)]})
dag.write_sub_files()
dag.write_dag()
dag.write_script()