Skip to content
Snippets Groups Projects
Commit d2f24c8b authored by Patrick Godwin's avatar Patrick Godwin
Browse files

gstlal_ll_inspiral_pipe, inspiral_pipe.py: remove unused code, refactor;...

gstlal_ll_inspiral_pipe, inspiral_pipe.py: remove unused code, refactor; gstlal_inspiral: modify far-trials-factor option name
parent c2f97868
No related branches found
No related tags found
No related merge requests found
......@@ -258,7 +258,7 @@ def parse_command_line():
group = OptionGroup(parser, "Ranking Statistic Options", "Adjust ranking statistic behaviour")
group.add_option("--cap-singles", action = "store_true", help = "Cap singles to 1 / livetime if computing FAR. No effect otherwise")
group.add_option("--FAR-trialsfactor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
group.add_option("--far-trials-factor", metavar = "trials", type = "float", default = 1.0, help = "Add trials factor to FAR before uploading to gracedb")
group.add_option("--chisq-type", metavar = "type", default = "autochisq", help = "Choose the type of chisq computation to perform. Must be one of (autochisq|timeslicechisq). The default is autochisq.")
group.add_option("--coincidence-threshold", metavar = "seconds", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005 s). The light-travel time between instruments will be added automatically in the coincidence test.")
group.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
......@@ -848,7 +848,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
kafka_server = options.output_kafka_server,
cluster = True,#options.data_source in ("lvshm", "framexmit"),# If uncommented, we only cluster when running online
cap_singles = options.cap_singles,
FAR_trialsfactor = options.FAR_trialsfactor,
FAR_trialsfactor = options.far_trials_factor,
verbose = options.verbose
)
if options.verbose:
......
This diff is collapsed.
......@@ -78,6 +78,262 @@ lsctables.use_in(LIGOLWContentHandler)
#
def online_inspiral_layer(dag, jobs, options):
job_tags = []
inj_job_tags = []
if options.ht_gate_threshold_linear is not None:
# Linear scale specified
template_mchirp_dict = get_svd_bank_params(options.bank_cache.values()[0], online = True)
mchirp_min, ht_gate_threshold_min, mchirp_max, ht_gate_threshold_max = [float(y) for x in options.ht_gate_threshold_linear.split("-") for y in x.split(":")]
bank_groups = list(build_bank_groups(options.bank_cache, [1], options.max_jobs - 1))
if len(options.likelihood_files) != len(bank_groups):
raise ValueError("Likelihood files must correspond 1:1 with bank files")
for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_groups, options.likelihood_files, options.zerolag_likelihood_files)):
svd_bank_string = ",".join([":".join([k, v[0]]) for k,v in svd_banks.items()])
job_tags.append("%04d" % num_insp_nodes)
# Calculate the appropriate ht-gate-threshold value
threshold_values = None
if options.ht_gate_threshold_linear is not None:
# Linear scale specified
# use max mchirp in a given svd bank to decide gate threshold
bank_mchirps = [template_mchirp_dict["%04d" % int(os.path.basename(svd_file).split("-")[1].split("_")[3])][1] for svd_file in svd_banks.items()[0][1]]
threshold_values = [(ht_gate_threshold_max - ht_gate_threshold_min)/(mchirp_max - mchirp_min)*(bank_mchirp - mchirp_min) + ht_gate_threshold_min for bank_mchirp in bank_mchirps]
elif options.ht_gate_threshold is not None:
threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {
"framexmit-addr": datasource.framexmit_list_from_framexmit_dict(options.framexmit_dict),
"framexmit-iface": options.framexmit_iface
}
else:
datasource_opts = {
"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict(options.shm_part_dict, opt = "shared-memory-partition"),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
common_opts = {
"psd-fft-length": options.psd_fft_length,
"reference-psd": options.reference_psd,
"ht-gate-threshold": threshold_values,
"channel-name": datasource.pipeline_channel_list_from_channel_dict(options.channel_dict),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.state_channel_dict, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits": options.state_vector_on_bits,
"state-vector-off-bits": options.state_vector_off_bits,
"dq-vector-on-bits": options.dq_vector_on_bits,
"dq-vector-off-bits": options.dq_vector_off_bits,
"svd-bank": svd_bank_string,
"tmp-space": dagparts.condor_scratch_space(),
"track-psd": "",
"control-peak-time": options.control_peak_time,
"coincidence-threshold": options.coincidence_threshold,
"fir-stride": options.fir_stride,
"data-source": options.data_source,
"gracedb-far-threshold": options.gracedb_far_threshold,
"gracedb-group": options.gracedb_group,
"gracedb-pipeline": options.gracedb_pipeline,
"gracedb-search": options.gracedb_search,
"gracedb-service-url": options.gracedb_service_url,
"job-tag": job_tags[-1],
"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
"far-trials-factor": options.far_trials_factor,
"min-instruments": options.min_instruments,
"time-slide-file": options.time_slide_file,
"output-kafka-server": options.output_kafka_server
}
common_opts.update(datasource_opts)
inspNode = dagparts.DAGNode(jobs['gstlalInspiral'], dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input": [likefile],
"ranking-stat-pdf": options.marginalized_likelihood_file
},
output_files = {
"output": "/dev/null",
"ranking-stat-output": likefile,
"zerolag-rankingstat-pdf": zerolikefile
}
)
if str("%04d" %num_insp_nodes) in options.inj_channel_dict:
# FIXME The node number for injection jobs currently follows the same
# numbering system as non-injection jobs, except instead of starting at
# 0000 the numbering starts at 1000. There is probably a better way to
# do this in the future, this system was just the simplest to start
# with
inj_job_tags.append("%04d" % (num_insp_nodes + 1000))
injection_opts = {
"channel-name": datasource.pipeline_channel_list_from_channel_dict_with_node_range(options.inj_channel_dict, node = job_tags[-1]),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_state_channel_dict, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict(options.inj_dq_channel_dict, opt = "dq-channel-name"),
"state-vector-on-bits": options.inj_state_vector_on_bits,
"state-vector-off-bits": options.inj_state_vector_off_bits,
"dq-vector-on-bits": options.inj_dq_vector_on_bits,
"dq-vector-off-bits": options.inj_dq_vector_off_bits,
"gracedb-far-threshold": options.inj_gracedb_far_threshold,
"gracedb-group": options.inj_gracedb_group,
"gracedb-pipeline": options.inj_gracedb_pipeline,
"gracedb-search": options.inj_gracedb_search,
"gracedb-service-url": options.inj_gracedb_service_url,
"job-tag": inj_job_tags[-1],
"likelihood-snapshot-interval": options.likelihood_snapshot_interval,
"far-trials-factor": options.far_trials_factor,
"min-instruments": options.min_instruments,
"time-slide-file": options.time_slide_file
}
common_opts.update(injection_opts)
inspInjNode = dagparts.DAGNode(jobs['gstlalInspiralInj'], dag, [],
opts = common_opts,
input_files = {
"ranking-stat-input": [likefile],
"ranking-stat-pdf": options.marginalized_likelihood_file
},
output_files = {
"output": "/dev/null"
}
)
return job_tags, inj_job_tags
def aggregator_layer(dag, jobs, options, job_tags):
# set up common settings for aggregation jobs
agg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(job_tags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
}
if options.agg_data_backend == 'influx':
agg_options.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
if options.enable_auth:
agg_options.update({"enable-auth": ""})
if options.enable_https:
agg_options.update({"enable-https": ""})
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in options.channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history"]
usage_routes = ["ram_history"]
state_routes = []
for ifo in options.channel_dict.keys():
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo)
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
agg_job_bounds = range(0, len(job_tags), max_agg_jobs) + [max_agg_jobs]
agg_routes = list(dagparts.groups(agg_routes, max(max_agg_jobs // (4 * len(job_tags)), 1))) + ["far_history"]
for routes in agg_routes:
these_options = dict(agg_options)
these_options["route"] = routes
if routes == "far_history":
these_options["data-type"] = "min"
else:
these_options["data-type"] = "max"
for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
these_options["job-start"] = aggstart
these_options["num-jobs"] = aggend - aggstart
if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(jobs['aggLeader'], dag, [], opts = these_options)
else:
aggNode = dagparts.DAGNode(jobs['agg'], dag, [], opts = these_options)
# Trigger aggregation
trigagg_options = {
"dump-period": 0,
"base-dir": "aggregator",
"job-tag": os.getcwd(),
"num-jobs": len(job_tags),
"num-threads": 2,
"job-start": 0,
"kafka-server": options.output_kafka_server,
"data-backend": options.agg_data_backend,
}
if options.agg_data_backend == 'influx':
trigagg_options.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
if options.enable_auth:
trigagg_options.update({"enable-auth": ""})
if options.enable_https:
trigagg_options.update({"enable-https": ""})
return dagparts.DAGNode(jobs['trigagg'], dag, [], opts = trigagg_options)
def dq_monitor_layer(dag, jobs, options):
outpath = 'aggregator'
for ifo in options.channel_dict:
# Data source dag options
if (options.data_source == "framexmit"):
datasource_opts = {
"framexmit-addr": datasource.framexmit_list_from_framexmit_dict({ifo: options.framexmit_dict[ifo]}),
"framexmit-iface": options.framexmit_iface
}
else:
datasource_opts = {
"shared-memory-partition": datasource.pipeline_channel_list_from_channel_dict({ifo: options.shm_part_dict[ifo]}),
"shared-memory-block-size": options.shared_memory_block_size,
"shared-memory-assumed-duration": options.shared_memory_assumed_duration
}
common_opts = {
"psd-fft-length": options.psd_fft_length,
"channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.channel_dict[ifo]}),
"state-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.state_channel_dict[ifo]}, opt = "state-channel-name"),
"dq-channel-name": datasource.pipeline_channel_list_from_channel_dict({ifo: options.dq_channel_dict[ifo]}, opt = "dq-channel-name"),
"state-vector-on-bits": options.state_vector_on_bits,
"state-vector-off-bits": options.state_vector_off_bits,
"dq-vector-on-bits": options.dq_vector_on_bits,
"dq-vector-off-bits": options.dq_vector_off_bits,
"data-source": options.data_source,
"out-path": outpath,
"data-backend": options.agg_data_backend,
}
common_opts.update(datasource_opts)
if options.agg_data_backend == 'influx':
common_opts.update({
"influx-database-name": options.influx_database_name,
"influx-hostname": options.influx_hostname,
"influx-port": options.influx_port,
})
if options.enable_auth:
common_opts.update({"enable-auth": ""})
if options.enable_https:
common_opts.update({"enable-https": ""})
return dagparts.DAGNode(jobs['dq'], dag, [], opts = common_opts)
def ref_psd_layer(dag, jobs, parent_nodes, segsdict, channel_dict, options):
psd_nodes = {}
for ifos in segsdict:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment