Skip to content
Snippets Groups Projects
Commit 77cfe8e0 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

dags/layers/inspiral.py: find inj triggers after clustering, add in relevant...

dags/layers/inspiral.py: find inj triggers after clustering, add in relevant auxiliary files to databases (segments, etc)
parent 3b376e5f
No related branches found
No related tags found
No related merge requests found
......@@ -237,7 +237,6 @@ def filter_layer(config, dag, ref_psd_cache, svd_bank_cache):
def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
# cluster triggers by SNR
trg_layer = Layer(
"lalapps_run_sqlite",
name="cluster_triggers_by_snr",
......@@ -250,6 +249,7 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
snr_cluster_sql_file = os.path.join(share_path, "snr_simplify_and_cluster.sql")
inj_snr_cluster_sql_file = os.path.join(share_path, "inj_snr_simplify_and_cluster.sql")
# cluster triggers by SNR
for (svd_bin, inj_type), triggers in trigger_cache.groupby("bin", "subtype").items():
trg_layer += Node(
arguments = [
......@@ -260,6 +260,8 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
outputs = Argument("clustered-triggers", triggers.files, suppress=True),
)
dag.attach(trg_layer)
# marginalize dist stats across time
dist_layer = Layer(
"gstlal_inspiral_marginalize_likelihood",
......@@ -278,7 +280,6 @@ def aggregate_layer(config, dag, trigger_cache, dist_stat_cache):
outputs = Option("output", agg_dist_stats.files)
)
dag.attach(trg_layer)
dag.attach(dist_layer)
return trigger_cache, agg_dist_stat_cache
......@@ -445,9 +446,27 @@ def cluster_layer(config, dag, trigger_cache):
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
inj_sqlite_to_xml_layer = Layer(
"ligolw_sqlite",
name="inj_sqlite_to_xml",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
injfind_layer = Layer(
"lalapps_inspinjfind",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
inj_xml_to_sqlite_layer = Layer(
"ligolw_sqlite",
name="inj_xml_to_sqlite",
requirements={"request_cpus": 1, "request_memory": 2000, **config.condor.submit},
transfer_files=config.condor.transfer_files,
)
combined_trigger_cache = DataCache.generate(DataType.TRIGGERS, config.ifo_combo, config.time_bins)
trigger_db_cache = DataCache.generate(DataType.TRIGGER_DATABASE, config.ifo_combo, config.span)
inj_trigger_cache = DataCache(DataType.TRIGGERS)
if config.filter.injections:
for inj_name, inj_args in config.filter.injections.items():
min_mchirp, max_mchirp = map(float, inj_args["range"].split(":"))
......@@ -464,6 +483,12 @@ def cluster_layer(config, dag, trigger_cache):
config.span,
subtype=inj_name
)
inj_trigger_cache += DataCache.generate(
DataType.TRIGGERS,
config.ifo_combo,
config.span,
subtype=inj_name
)
combined_triggers = combined_trigger_cache.groupby("time", "subtype")
# FIXME: find better way of discovering SQL file
......@@ -472,21 +497,29 @@ def cluster_layer(config, dag, trigger_cache):
inj_cluster_sql_file = os.path.join(share_path, "inj_simplify_and_cluster.sql")
# combine triggers across SVD bins
for key, triggers in trigger_cache.groupby("time", "subtype").items():
# if triggers are from an injection job, also add in the injections
for (span, inj_type), triggers in trigger_cache.groupby("time", "subtype").items():
combine_layer += Node(
inputs = Argument("triggers", triggers.files),
outputs = Option("output", combined_triggers[key].files),
inputs = Argument("inputs", triggers.files),
outputs = Option("output", combined_triggers[(span, inj_type)].files),
)
combined_triggers = combined_trigger_cache.groupby("subtype")
for inj_type, trigger_dbs in trigger_db_cache.groupby("subtype").items():
inputs = [
config.source.frame_segments_file,
*combined_triggers[inj_type].files,
]
if inj_type:
inputs.append(config.filter.injections[inj_type.lower()]["file"])
# convert triggers to sqlite
sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("calc-triggers", combined_triggers[inj_type].files),
inputs = Option("inputs", inputs),
outputs = Option("database", trigger_dbs.files),
)
......@@ -496,7 +529,7 @@ def cluster_layer(config, dag, trigger_cache):
Option("sql-file", inj_cluster_sql_file if inj_type else cluster_sql_file),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("combined-triggers", trigger_dbs.files),
inputs = Argument("triggers", trigger_dbs.files),
outputs = Argument("calc-triggers", trigger_dbs.files, suppress=True),
)
......@@ -504,6 +537,41 @@ def cluster_layer(config, dag, trigger_cache):
dag.attach(sqlite_layer)
dag.attach(cluster_layer)
# for injections: convert to XML, find injections, convert back to databases
trigger_dbs = trigger_db_cache.groupby("subtype")
for inj_type, triggers in inj_trigger_cache.groupby("subtype").items():
# convert triggers to XML
inj_sqlite_to_xml_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Option("database", trigger_dbs[inj_type].files),
outputs = Option("extract", triggers.files),
)
# find injections
injfind_layer += Node(
arguments = Option("time-window", 0.9),
inputs = Argument("triggers", triggers.files),
outputs = Argument("inj-triggers", triggers.files, suppress=True),
)
# convert triggers back to sqlite
inj_xml_to_sqlite_layer += Node(
arguments = [
Option("replace"),
Option("tmp-space", dagutil.condor_scratch_space()),
],
inputs = Argument("triggers", triggers.files),
outputs = Option("database", trigger_dbs[inj_type].files),
)
if config.filter.injections:
dag.attach(inj_sqlite_to_xml_layer)
dag.attach(injfind_layer)
dag.attach(inj_xml_to_sqlite_layer)
return trigger_db_cache
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment