Skip to content
Snippets Groups Projects
construct_skymap_test_dag 11.1 KiB
Newer Older
construct_skymap_test_dag --injection-database=path/to/injection/database --tmp-space=path/to/tmp/space --far-threshold=1e-7 --max-number-inspiral-jobs=max_number_inspiral_jobs
from optparse import OptionParser
import glob
from ligo.lw import ligolw
from ligo.lw import lsctables
from ligo.lw import utils as ligolw_utils
from ligo.lw.utils import process as ligolw_process
from gstlal import dagparts
def parse_command_line():
	parser = OptionParser()
	parser.add_option("-i", "--injection-database", metavar = "filename", help = "Path to the injection database.")
	parser.add_option("-t", "--tmp-space", metavar = "path", help = "Path to temp space. Defaults to _CONDOR_SCRATCH_DIR .")
	parser.add_option("-f", "--far-threshold", type = "float", default = 3.86e-7, help = "Threshold for rerunning an injection job to produce the time series.")
	parser.add_option("-m", "--max-number-inspiral-jobs", type = "int", default = 10000, help = "Maximum number of inspiral jobs to queue. If this option is not provided, the program defaults to the top 10000 that pass the imposed FAR threshold.")

	options, filenames = parser.parse_args()

	return options, filenames

options, filenames = parse_command_line()

if not options.injection_database:
	raise ValueError("You must provide an injection database.")

if not options.tmp_space:
	if '_CONDOR_SCRATCH_DIR' in os.environ:
		options.tmp_space = os.environ['_CONDOR_SCRATCH_DIR']
	else:
		options.tmp_space = os.environ['TMPDIR']



# copied from gstlal_inspiral_plotsummary
def create_sim_coinc_view(connection):
	"""
	Construct a sim_inspiral --> best matching coinc_event mapping.
	Only injections that match at least one coinc get an entry in this
	table.
	"""
	#
	# the log likelihood ratio stored in the likelihood column of the
	# coinc_event table is the ranking statistic.  the "best match" is
	# the coinc with the highest value in this column.  although it has
	# not been true in the past, there is now a one-to-one relationship
	# between the value of this ranking statistic and false-alarm rate,
	# therefore it is OK to order by log likelihood ratio and then,
	# later, impose a "detection" threshold based on false-alarm rate.
	#

	connection.cursor().execute("""
CREATE TEMPORARY TABLE
	sim_coinc_map_helper
AS
	SELECT a.event_id as sid,
		coinc_event.coinc_event_id as cid,
		coinc_event.likelihood as lr,
		coinc_event.nevents as nevents,
		coinc_event.instruments as instruments
	FROM coinc_event_map as a
		JOIN coinc_event_map AS b ON (b.coinc_event_id == a.coinc_event_id)
		JOIN coinc_event ON (coinc_event.coinc_event_id == b.event_id)
	WHERE a.table_name == "sim_inspiral"
		AND b.table_name == "coinc_event"
		AND NOT EXISTS (SELECT * FROM time_slide WHERE time_slide.time_slide_id == coinc_event.time_slide_id AND time_slide.offset != 0);
	""")

	connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_coinc_map_helper_index ON sim_coinc_map_helper (sid, cid);")

#
# FIXME NOTE FIXME this selects triple times with no more than two ifos to test the itacac sub threshold stuff
#

	connection.cursor().execute("""
CREATE TEMPORARY TABLE
        sim_coinc_map
AS
        SELECT
                sim_inspiral.simulation_id AS simulation_id,
                (
                        SELECT
                                cid
                        FROM
				sim_coinc_map_helper
                        WHERE
                                sid = simulation_id
                        ORDER BY
                                lr
			DESC
                        LIMIT 1
                ) AS coinc_event_id
        FROM
                sim_inspiral
        WHERE
                coinc_event_id IS NOT NULL;

	""")

	connection.cursor().execute("DROP INDEX sim_coinc_map_helper_index;")

	connection.cursor().execute("""
CREATE TEMPORARY TABLE
	sim_id_combined_far
AS
	SELECT
		coinc_inspiral.combined_far AS far, sim_coinc_map.simulation_id AS sim_id
	FROM
		sim_coinc_map
		JOIN coinc_inspiral ON ( coinc_inspiral.coinc_event_id == sim_coinc_map.coinc_event_id  )
	""")

	connection.cursor().execute("""
CREATE TEMPORARY TABLE
	sim_id_sngl_id
	AS
		SELECT
			sim_coinc_map.simulation_id AS sim_id, sngl_inspiral.event_id AS sngl_id
		FROM
			sim_coinc_map
		JOIN coinc_event_map as mapA ON ( mapA.coinc_event_id == sim_coinc_map.coinc_event_id )
		JOIN sngl_inspiral ON ( sngl_inspiral.event_id == mapA.event_id )
	""")

	connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_combined_far_index ON sim_id_combined_far (far, sim_id)")
	connection.cursor().execute("CREATE INDEX IF NOT EXISTS sim_id_sngl_id_index ON sim_id_sngl_id (sim_id, sngl_id)")

	connection.cursor().execute("""
CREATE TEMPORARY TABLE
	sim_sngl_far
	AS
		SELECT
			sngl_inspiral.process_id AS pid,
			sngl_inspiral.Gamma1 AS Gamma1,
			sim_id_combined_far.far AS far,
			sim_inspiral.simulation_id AS simulation_id,
			sim_inspiral.*
		FROM
			sim_inspiral
			JOIN sim_id_sngl_id ON (
				sim_inspiral.simulation_id == sim_id_sngl_id.sim_id
			)
			JOIN sngl_inspiral ON (
				sngl_inspiral.event_id == sim_id_sngl_id.sngl_id
			)
			JOIN sim_id_combined_far ON (
				sim_id_combined_far.sim_id == sim_id_sngl_id.sim_id
			)
	""")

	connection.cursor().execute("DROP INDEX sim_id_combined_far_index")
	connection.cursor().execute("DROP INDEX sim_id_sngl_id_index")

inj_db = options.injection_database
tmp_space = options.tmp_space
num_inspiral_jobs = options.max_number_inspiral_jobs
analysis_dir = os.path.dirname(inj_db)

working_filename = dbtables.get_connection_filename(inj_db, tmp_path = tmp_space, verbose = True)
connection = sqlite3.connect(working_filename)

create_sim_coinc_view(connection)
sim_row = {}
xmldoc = dbtables.get_xml(connection)
sim_inspiral_table = lsctables.SimInspiralTable.get_table(xmldoc)
for record in connection.cursor().execute("""
""" % (options.far_threshold,), (int(num_inspiral_jobs),)):
	process_id = record[0]
	bank_id = record[1]
	far = record[2]
	simid = record[3]
	simrow = record[4:]
	sim_row[(bank_id, process_id, simid)] = sim_inspiral_table.row_from_cols(simrow)

master_opts_dict = { 
	"gps-start-time":None,
	"gps-end-time": None,
	"psd-fft-length": 32,
	"likelihood-snapshot-interval": 100000.0,
	"track-psd": "",
	"min-instruments": None,
	"gracedb-far-threshold": 1e-6,
	"gracedb-service-url": None,
	"ht-gate-threshold": 50.0,
	"veto-segments-name": "vetoes",
	"fir-stride": 0.25,
	"gracedb-group": "CBC",
	"coincidence-threshold": 0.005,
	"control-peak-time": 0,
	"gracedb-pipeline": "gstlal",
	"data-source": None,
	"frame-segments-name": None,
	"tmp-space": None,
	"gracedb-search": "AllSky",
	"channel-name": None,
	"singles-threshold": "inf",
	"local-frame-caching":"",
	"verbose": ""
}

master_input_dict = {
	"reference-psd": None,
	"svd-bank": None, # FIXME THIS ONE IS TRICKY
	"ranking-stat-pdf": "%s/post_marginalized_likelihood.xml.gz" % analysis_dir,
	"ranking-stat-input": None, # FIXME THIS ONE IS TRICKY
	"veto-segments-file": None,
	"frame-segments-file": None,
	"frame-cache": None,
	"time-slide-file": None,
	"injections": None, # FIXME make this just a single injection with the correct parameters
	}

master_output_dict = {
	"ranking-stat-output": "not_used.xml.gz",
	"zerolag-rankingstat-pdf": "notused2.xml.gz",
	"output": None,
}

try:
	os.mkdir("logs")
except:
	pass
dag = dagparts.DAG("trigger_pipe")

gstlalInspiralInjJob = dagparts.DAGJob("gstlal_inspiral",
	tag_base="gstlal_inspiral_inj",
	condor_commands = {"request_memory":"5gb", 
		"request_cpus":"2",
		"want_graceful_removal":"True",
		"kill_sig":"15"}
	)
noIlwdcharJob = dagparts.DAGJob("ligolw_no_ilwdchar")
bashJob = dagparts.DAGJob("bash")


def updatedict(x, y):
	for k in x:
		if x[k] is None:
			try:
				x[k] = y[k]
			except KeyError as e:
				pass

def fixrelpath(x, ys):
	for y in ys:
		x[y] = "%s/%s" % (analysis_dir, x[y][0])

def new_inj_file(row, output):
	xmldoc = ligolw.Document()
	lw = xmldoc.appendChild(ligolw.LIGO_LW())
	sim_inspiral_table = lsctables.New(lsctables.SimInspiralTable)
	lw.appendChild(sim_inspiral_table)
	sim_inspiral_table.append(row)
	ligolw_utils.write_filename(xmldoc, output, gz = output.endswith('gz'))


try:
	os.mkdir("inj_files")
except OSError:
	pass

try:
	os.mkdir("lloid_files")
except OSError:
	pass

mass_model = analysis_dir + '/gstlal_inspiral_mass_model'
try:
	os.mkdir("gstlal_inspiral_mass_model")
	os.system('cp -r %s .' % (mass_model,))
except OSError:
	pass

bayesdir = "bayestar_input"
try:
	os.mkdir(bayesdir)
except OSError:
	pass
	

f = open("process.sh", "w")
f.write("""#!/usr/bin/bash
ID=$1
FILE=$(gstlal_inspiral_best_coinc_file lloid_files/${ID}/*CBC_AllSky-*-0.xml)
ligolw_no_ilwdchar ${FILE}
mkdir -p bayestar_input/${ID}/
gstlal_ligolw_add_without_reassign ${FILE} inj_files/${ID}_inj.xml.gz --output bayestar_input/${ID}/${ID}_event.xml.gz
lalapps_inspinjfind --time-window 0.9 bayestar_input/${ID}/${ID}_event.xml.gz
gstlal_ilwdify bayestar_input/${ID}/${ID}_event.xml.gz
""")
f.close()

for job_id, (bankid, process_id, simid) in enumerate(sim_row, start=1):
	# FIXME Need to add option for dist stats output
	print "++ job_id: %s ++" % job_id
	job_dict = {}
	for param, value in connection.cursor().execute("SELECT param, value FROM process_params WHERE process_id == ?", (process_id,)):
		job_dict.setdefault(param.replace("--",""), []).append(value)
	this_opts_dict = master_opts_dict.copy()
	updatedict(this_opts_dict, job_dict)
	this_input_dict = master_input_dict.copy()
	updatedict(this_input_dict, job_dict)
	this_output_dict = master_output_dict.copy()
	updatedict(this_output_dict, job_dict)

	# FIX some stuff
	fixrelpath(this_input_dict, ("reference-psd", "frame-cache", "time-slide-file", "veto-segments-file", "frame-segments-file"))

	# make a custom injection file
	inj_file_name = "inj_files/%d_%d_%d_inj.xml.gz" % (job_id, bankid, process_id)
	new_inj_file(sim_row[(bankid, process_id, simid)], inj_file_name)
	this_input_dict["injections"] = inj_file_name

	# FIXME hacks for the svd
	instruments = [x.split("=")[0] for x in this_opts_dict["channel-name"]]
	banks = ["%s:%s" % (ifo, glob.glob("%s/gstlal_svd_bank/%s-%04d_SVD*" % (analysis_dir, ifo, bankid))[0]) for ifo in instruments]
	this_input_dict["svd-bank"] = ",".join(banks)

	# FIXME don't hardcode H1L1V1
	ranking_stat_pdf = glob.glob("%s/gstlal_inspiral_marginalize_likelihood/H1L1V1-%04d_MARG_DIST_STATS*" % (analysis_dir, bankid))[0]
	this_input_dict["ranking-stat-input"] = ranking_stat_pdf

	# just name the output the same as the input
	outdir = "lloid_files/%d_%d_%d" % (job_id, bankid, process_id)
		os.mkdir(outdir)
	except OSError:
		pass
	output_file_name = "%s/%d_%d_%d_lloid.xml.gz" % (outdir, job_id, bankid, process_id)
	this_output_dict["output"] = output_file_name

	this_opts_dict["gracedb-service-url"] = "file://%s/%s" % (os.getcwd(), outdir)

	node = dagparts.DAGNode(gstlalInspiralInjJob, dag, parent_nodes = [], opts = this_opts_dict, input_files = this_input_dict, output_files = this_output_dict)

	# all the remaining post processing
	node = dagparts.DAGNode(bashJob, dag, parent_nodes = [node], opts = {"":["process.sh", "%d_%d_%d" % (job_id, bankid, process_id)]})


dag.write_sub_files()
dag.write_dag()
dag.write_script()