Skip to content
Snippets Groups Projects

Plot horizon distance from ranking statistics

Merged ChiWai Chan requested to merge plot_psd_horizon into master
1 unresolved thread
1 file
+ 34
73
Compare changes
  • Side-by-side
  • Inline
@@ -21,21 +21,14 @@ from optparse import OptionParser
import os
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
GObject.threads_init()
Gst.init(None)
import lal
from gstlal import kernels
from gstlal import pipeparts
from gstlal import simplehandler
from gstlal import datasource
from gstlal import multirate_datasource
from gstlal.psd import read_psd
from ligo.lw import utils as ligolw_utils
from gstlal.stream import Stream
### This program will make fake data in a variety of ways. Its input is anything
### supported by datasource.py. You can additionally whiten the data or apply a
@@ -223,46 +216,37 @@ def parse_command_line():
options, filenames = parse_command_line()
## Create output directory
os.makedirs(options.output_path, exist_ok=True)
## Parse the command line options into a python.datasource.GWDataSourceInfo class instance
gw_data_source = datasource.DataSourceInfo.from_optparse(options)
## Assume instrument is the first and only key of the python.datasource.GWDataSourceInfo.channel_dict
instrument = list(gw_data_source.channel_dict.keys())[0]
# disable progress reports if not verbose
if not options.verbose:
pipeparts.mkprogressreport = lambda pipeline, src, *args: src
# set default output channel if not set by user
if options.output_channel_name is None:
options.output_channel_name = gw_data_source.channel_dict[instrument]
# do not do injections in datasource.mkbasicsrc(), we will do them later
# do not do injections in datasource, we will do them later
injections, gw_data_source.injection_filename = options.injections, None
## Setup the pipeline
pipeline = Gst.Pipeline(name=os.path.split(sys.argv[0])[1])
## Main loop
mainloop = GObject.MainLoop()
## An instance of the python.simplehandler.Handler class
handler = simplehandler.Handler(mainloop, pipeline)
## 1) Set the pipeline head to basic input from datasource.mkbasicsrc()
## 1) Initialize stream from data source
# FIXME: fake source causes problems when making large buffers, so block_size needs to be overwritten
gw_data_source.block_size = 8 * options.sample_rate
head, _, _ = datasource.mkbasicsrc(pipeline, gw_data_source, instrument, verbose = options.verbose)
stream = Stream.from_datasource(gw_data_source, instrument, verbose=options.verbose)
## 2) Set the pipeline head to be verbose with pipeparts.mkprogressreport()
head = pipeparts.mkprogressreport(pipeline, head, "frames")
## 2) Add progress report to stream if verbose
if options.verbose:
stream = stream.progressreport("frames")
if options.shift is not None:
## 3) Set the pipeline head to apply a time shift if requested with a pipeparts.mkshift() element
head = pipeparts.mkshift(pipeline, head, shift = options.shift)
## 3) Apply a time shift if requested with a shift() element
stream = stream.shift(shift=options.shift)
## 4) Set the pipeline head to be verbose with a pipeparts.mkprogressreport() element
head = pipeparts.mkprogressreport(pipeline, head, "frames_shifted")
## 4) Add progress report to stream if verbose
stream = stream.progressreport("frames_shifted")
if options.whiten_reference_psd:
## If whitening read the PSD
@@ -272,16 +256,17 @@ else:
wpsd = None
if options.whiten_reference_psd or options.whiten_track_psd:
## 5) Set the pipeline head to a whitened data stream if requested using a multirate_datasource.mkwhitened_multirate_src()
head = multirate_datasource.mkwhitened_multirate_src(pipeline, head, [options.sample_rate], instrument, psd = wpsd, track_psd = options.whiten_track_psd)[options.sample_rate]
## 5) Whiten the data stream if requested with a condition() element
stream = stream.condition(options.sample_rate, instrument, psd=wpsd, track_psd=options.whiten_track_psd)
else:
## 6) Otherwise simply add a pipeparts.mkcapsfilter() and pipeparts.mkresample()
head = pipeparts.mkcapsfilter(pipeline, pipeparts.mkresample(pipeline, head, quality = 9), "audio/x-raw, rate=%d" % options.sample_rate)
## 6) Otherwise, resample the stream with resample() and add a capsfilter()
stream = stream.resample(quality=9).capsfilter(f"audio/x-raw, rate={options.sample_rate}")
# FIXME this is a bit hacky, should datasource.mkbasicsrc be patched to change the sample rate?
# FIXME don't hardcode sample rate (though this is what datasource generates for all fake data, period
# Renormalize if datasource is "white"
if options.data_source == "white":
head = pipeparts.mkaudioamplify(pipeline, head, 1.0 / (pipeparts.audioresample_variance_gain(9, 16384, options.sample_rate))**.5)
renormalize_factor = 1.0 / (pipeparts.audioresample_variance_gain(9, 16384, options.sample_rate))**.5
stream = stream.audioamplify(renormalize_factor)
# Apply a coloring filter
if options.color_psd:
@@ -304,56 +289,32 @@ if options.color_psd:
FIRKernel = kernels.PSDFirKernel()
fir_matrix, latency, measured_sample_rate = FIRKernel.psd_to_linear_phase_whitening_fir_kernel(rpsd, invert = False)
## 7) Set the pipeline head to a pipeparts.mkfirbank() recoloring filter
head = pipeparts.mkfirbank(pipeline, head, latency = latency, fir_matrix = [fir_matrix], block_stride = 1 * options.sample_rate)
## 7) Recolor the data with an FIR recoloring filter
stream = stream.firbank(latency=latency, fir_matrix=[fir_matrix], block_stride=(1 * options.sample_rate))
## Set the tags for the output data
tagstr = "units=strain,channel-name=%s,instrument=%s" % (options.output_channel_name, instrument)
tagstr = f"units=strain,channel-name={options.output_channel_name},instrument={instrument}"
## 8) Put the units back to strain before writing to frames. Additionally, override the output channel name if provided from the command line
head = pipeparts.mktaginject(pipeline, head, tagstr)
stream = stream.taginject(tagstr)
## 9) Add injections into the final fake data
if injections is not None:
## 9) Do injections into the final fake data
head = pipeparts.mkinjections(pipeline, head, injections)
if not os.path.isdir(options.output_path):
try:
os.makedirs(options.output_path)
except:
print("Unable to make directory ", options.output_path, file=sys.stderr)
pass
else:
print("Target output directory already exists.")
stream = stream.injections(injections)
## 10) create frames
head = pipeparts.mkframecppchannelmux(pipeline, {"%s:%s" % (instrument, options.output_channel_name): head}, frame_duration = options.output_frame_duration, frames_per_file = options.output_frames_per_file)
stream = stream.framecppchannelmux(
channels=f"{instrument}:{options.output_channel_name}",
frame_duration=options.output_frame_duration,
frames_per_file=options.output_frames_per_file
)
## 11) Write the frames to disk
head = pipeparts.mkframecppfilesink(pipeline, head, frame_type = options.output_frame_type)
framesink = stream.framecppfilesink(frame_type=options.output_frame_type)
# Put O(100000 s) frames in each directory
head.connect("notify::timestamp", pipeparts.framecpp_filesink_ldas_path_handler, (options.output_path, 5))
framesink.connect("notify::timestamp", pipeparts.framecpp_filesink_ldas_path_handler, (options.output_path, 5))
# Run it
if pipeline.set_state(Gst.State.READY) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter READY state")
datasource.pipeline_seek_for_gps(pipeline, gw_data_source.seg[0], gw_data_source.seg[1])
if pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
raise RuntimeError("pipeline failed to enter PLAYING state")
## Debugging output
if "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
pipeparts.write_dump_dot(pipeline, "%s_PLAYING" % pipeline.get_name(), verbose = True)
## Setup a signal handler to intercept SIGINT in order to write the pipeline graph at ctrl+C before cleanly shutting down
class SigHandler(simplehandler.OneTimeSignalHandler):
def do_on_call(self, signum, frame):
pipeparts.write_dump_dot(pipeline, "%s_SIGINT" % pipeline.get_name(), verbose = True)
sighandler = SigHandler(pipeline)
mainloop.run()
stream.start()
Loading