Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
3 files
+ 209
347
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -136,6 +136,7 @@ except ImportError:
# fpconst is not part of the standard library and might not be
# available
PosInf = float("+inf")
from collections import defaultdict
from functools import reduce
import gzip
import itertools
@@ -155,8 +156,6 @@ import warnings
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
import lal
from lal import LIGOTimeGPS
@@ -179,10 +178,13 @@ from gstlal import inspiral
from gstlal import lloidhandler
from gstlal import lloidparts
from gstlal import pipeparts
from gstlal import pipeio
from gstlal import servicediscovery
from gstlal import simulation
from gstlal import svd_bank
from gstlal.psd import read_psd
from gstlal.stream import MessageType, Stream, StreamMap
from gstlal.stats.inspiral_lr import LnLRDensity
GSTLAL_PROCESS_START_TIME = UTCToGPS(time.gmtime())
@@ -811,27 +813,106 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
if options.verbose:
print("assembling pipeline ...", file=sys.stderr)
pipeline = Gst.Pipeline(name="gstlal_inspiral")
mainloop = GObject.MainLoop()
triggersrc = lloidparts.mkLLOIDmulti(
pipeline,
detectors = detectors,
banks = banks,
psd = psd,
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = ht_gate_threshold,
veto_segments = veto_segments,
verbose = options.verbose,
nxydump_segment = options.nxydump_segment,
chisq_type = options.chisq_type,
track_psd = options.track_psd,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
reconstruction_segment_list = reconstruction_segment_list,
track_latency = options.data_source in ("lvshm", "framexmit"),
stream = Stream.from_datasource(
detectors,
detectors.channel_dict.keys(),
state_vector=True,
dq_vector=True,
verbose=options.verbose,
)
#
# construct dictionaries of whitened, conditioned, down-sampled
# h(t) streams. NOTE: we assume all banks for each instrument
# were generated with the same processed PSD for that instrument
# and just extract the first without checking that this assumption
# is correct
#
for instrument, head in stream.items():
rates = set(rate for bank in banks[instrument] for rate in bank.get_rates())
if stream.source.is_live:
head = head.latency(name=f"{instrument}_datasource_latency", silent=True)
head = head.condition(
max(rates),
instrument,
width = 32,
statevector = stream.source.state_vector[instrument],
dqvector = stream.source.dq_vector[instrument],
psd = psd[instrument],
psd_fft_length = options.psd_fft_length,
ht_gate_threshold = ht_gate_threshold,
veto_segments = veto_segments[instrument] if veto_segments is not None else None,
fir_whiten_reference_psd = banks[instrument][0].processed_psd,
nxydump_segment = options.nxydump_segment,
track_psd = options.track_psd,
track_latency = stream.source.is_live,
)
stream[instrument] = head.multiband(rates, instrument=instrument)
#
# construct SNR slices
#
snr_slices = defaultdict(dict)
for instrument, banklist in banks.items():
for bank in banklist:
suffix = "%s%s" % (instrument, (bank.logname and "_%s" % bank.logname or ""))
snr = stream[instrument].create_snr_slices(
bank,
control_peak_time = options.control_peak_time,
fir_stride = options.fir_stride,
logname = suffix,
reconstruction_segment_list = reconstruction_segment_list,
verbose = options.verbose
)
if stream.source.is_live and instrument not in checked_ifo:
snr = snr.latency(name=f"{instrument}_snrSlice_latency", silent=True)
checked_ifo.add(instrument)
snr_slices[bank.bank_id][instrument] = snr.checktimestamps(f"timestamps_{suffix}_snr")
#
# construct trigger generators
#
itacac_props = defaultdict(dict)
for instrument, banklist in banks.items():
for bank in banklist:
# 4 is about the lowest we can do stably for
# coincidence online...
# FIXME get this to work some day
#nsamps_window = max(bank.get_rates()) / 4
nsamps_window = 1 * max(bank.get_rates())
itacac_props[bank.bank_id][instrument] = {
"n": nsamps_window,
"snr-thresh": LnLRDensity.snr_min,
"bank_filename": bank.template_bank_filename,
"sigmasq": bank.sigmasq,
"autocorrelation_matrix": pipeio.repack_complex_array_to_real(bank.autocorrelation_bank),
"autocorrelation_mask": bank.autocorrelation_mask,
}
# FIXME: find a way to use less memory without this hack
del bank.autocorrelation_bank
for i, (bank_id, head) in enumerate(snr_slices.items()):
head = StreamMap.from_dict(head)
head = head.itacac(**itacac_props[bank_id])
if stream.source.is_live and i == 0:
head = head.latency(name="all_itacac_latency", silent=True)
if options.verbose:
head = head.queue(max_size_buffers=10, max_size_bytes=0, max_size_time=0)
head = head.progressreport(f"progress_xml_bank_{bank_id}")
snr_slices[bank_id] = head
triggersrc = StreamMap.from_dict(snr_slices)
if options.verbose:
print("done", file=sys.stderr)
@@ -885,9 +966,8 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
if options.verbose:
print("initializing pipeline handler ...", file=sys.stderr)
handler = lloidhandler.Handler(
mainloop = mainloop,
pipeline = pipeline,
tracker = lloidhandler.LLOIDTracker(
stream,
coincs_document = inspiral.CoincsDocument(
url = output_url or "%s-%s_LLOID-%d-%d.xml.gz" % (lsctables.instrumentsproperty.set(detectors.channel_dict.keys()).replace(",", ""), options.job_tag, int(detectors.seg[0]), int(abs(detectors.seg))),
process_params = process_params,
@@ -932,31 +1012,20 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
cap_singles = options.cap_singles,
FAR_trialsfactor = options.far_trials_factor,
activation_counts = options.activation_counts,
track_latency = options.data_source in ("lvshm", "framexmit"),
track_latency = stream.source.is_live,
template_id_time_map = options.upload_time_before_merger,
background_collector_type = background_collector_type,
verbose = options.verbose
)
if options.verbose:
print("... pipeline handler initialized", file=sys.stderr)
if options.verbose:
print("attaching appsinks to pipeline ...", file=sys.stderr)
appsync = pipeparts.AppSync(appsink_new_buffer = handler.appsink_new_buffer)
appsinks = set(appsync.add_sink(pipeline, src, caps = Gst.Caps.from_string("application/x-lal-snglinspiral"), name = "bank_%s_sink" % bank_id) for bank_id, src in triggersrc.items())
stream.add_callback(MessageType.ELEMENT, "spectrum", tracker.on_spectrum_update)
stream.add_callback(MessageType.APPLICATION, "CHECKPOINT", tracker.on_checkpoint)
stream.add_callback(MessageType.EOS, tracker.on_eos)
if options.verbose:
print("attached %d, done" % len(appsinks), file=sys.stderr)
#
# if we request a dot graph of the pipeline, set it up
#
print("... pipeline handler initialized", file=sys.stderr)
if options.write_pipeline is not None:
pipeparts.connect_appsink_dump_dot(pipeline, appsinks, options.write_pipeline, options.verbose)
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "NULL"), verbose = options.verbose)
triggersrc.bufsink(tracker.on_buffer, caps=Gst.Caps.from_string("application/x-lal-snglinspiral"))
#
@@ -964,36 +1033,9 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
#
if options.data_source in ("lvshm", "framexmit"):
#
# setup sigint handler to shutdown pipeline. this is how
# the program stops gracefully, it is the only way to stop
# it. Otherwise it runs forever man.
#
# this is only done in the online case because we want
# offline jobs to just die and not write partial databases
#
signal.signal(signal.SIGINT, OneTimeSignalHandler(pipeline))
signal.signal(signal.SIGTERM, OneTimeSignalHandler(pipeline))
if options.verbose:
print("setting pipeline state to ready ...", file=sys.stderr)
if pipeline.set_state(Gst.State.READY) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter ready state")
datasource.pipeline_seek_for_gps(pipeline, *detectors.seg)
if options.verbose:
print("setting pipeline state to playing ...", file=sys.stderr)
if pipeline.set_state(Gst.State.PLAYING) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline did not enter playing state")
if options.write_pipeline is not None:
pipeparts.write_dump_dot(pipeline, "%s.%s" % (options.write_pipeline, "PLAYING"), verbose = options.verbose)
if options.verbose:
print("running pipeline ...", file=sys.stderr)
mainloop.run()
stream.start()
#
@@ -1001,7 +1043,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
#
handler.write_output_url(url = output_url or handler.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)
tracker.write_output_url(url = output_url or handler.T050017_filename("%s_LLOID" % options.job_tag, "xml.gz"), description = "%s_LLOID" % options.job_tag, verbose = options.verbose)
#
@@ -1032,13 +1074,13 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
#
del snr_slices
del triggersrc
del handler.pipeline
del handler
del tracker
del bank
del banks
if pipeline.set_state(Gst.State.NULL) != Gst.StateChangeReturn.SUCCESS:
raise RuntimeError("pipeline could not be set to NULL")
stream.set_state(Gst.State.NULL)
#
Loading