Commit b9612a2c authored by Qi Chu's avatar Qi Chu
Browse files

New feature:

Bypass low-sensitivity IFOs in postcoh trigger formation;
 the bypassed IFOs will be used to find corresponding SNRs for postcoh triggers.

Code changes:
Online pipeline interface change (gstlal_inspiral_postcohspiir_online):
add one option --cuda-postcoh-parti-ifos to indicate participating IFOs in postcoh, e.g. H1L1
(default). The left-out IFOs from the option --iir-bank setting will be bypassed and
only used for SNRs. E.g. --iir-bank setting: --iir-bank H1:H1bank,L1:L1bank,V1:L1bank,K1:K1bank.

One new plugin to attach SNRs from less sensitive IFOs to the postcoh trigger: trigger_jointer.

Pipeline assembly code changed to put this new plugin in the right place.
parent 8c846ec2
......@@ -457,7 +457,13 @@ def parse_command_line():
type="int",
default=800,
help="interval to refresh detrsp map.")
parser.add_option(
"--cuda-postcoh-parti-ifos",
metavar="name",
default="H1L1",
help="IFOs that will not be used in coherent search. E.g. V1K1"
)
# gpu acceleartion support
parser.add_option("--gpu-acc",
action="store_true",
......@@ -734,6 +740,7 @@ postcohsrcs = spiirparts.mkPostcohSPIIROnline(
cuda_postcoh_output_skymap=options.cuda_postcoh_output_skymap,
cuda_postcoh_detrsp_refresh_interval=options.
cuda_postcoh_detrsp_refresh_interval,
cuda_postcoh_parti_ifos=options.cuda_postcoh_parti_ifos,
cohfar_file_path=options.job_tag,
cohfar_accumbackground_output_prefix=options.
cohfar_accumbackground_output_prefix,
......
......@@ -25,7 +25,8 @@ AC_CONFIG_FILES([ \
lib/Makefile \
bin/Makefile \
gst/Makefile \
gst/cuda/Makefile
gst/cuda/Makefile \
gst/lal/Makefile
])
......@@ -372,7 +373,7 @@ AC_SUBST([XML_LIBS])
# custom cflags and libs
#
GSTLAL_SPIIR_CFLAGS="-I\$(top_srcdir)/include -I\$(top_srcdir)/lib/include"
GSTLAL_SPIIR_LIBS="-L\$(top_srcdir)/lib -lgstlalspiir"
GSTLAL_SPIIR_LIBS="-L\$(top_srcdir)/lib"
AC_SUBST([GSTLAL_SPIIR_CFLAGS])
AC_SUBST([GSTLAL_SPIIR_LIBS])
#
......
......@@ -4,5 +4,6 @@ else
CUDA_SUBDIRS =
endif
LAL_SUBDIRS = lal
SUBDIRS = $(CUDA_SUBDIRS)
SUBDIRS = $(CUDA_SUBDIRS) $(LAL_SUBDIRS)
ADD_CFLAGS = $(GSTLAL_SPIIR_CFLAGS) $(LAPACKE_CFLAGS) $(SPIIR_FIX_COMPLEX_CFLAGS)
ADD_LIBS= $(top_srcdir)/lib/libgstlalspiir.la
plugin_LTLIBRARIES = libgstlalspiir.la
libgstlalspiir_la_SOURCES = \
gstlalspiir.c \
triggerjointer/triggerjointer.c
libgstlalspiir_la_CFLAGS = $(ADD_CFLAGS) $(GSL_CFLAGS) $(LAL_CFLAGS) $(GSTLAL_CFLAGS) $(gstreamer_CFLAGS)
libgstlalspiir_la_LIBADD = $(ADD_LIBS)
libgstlalspiir_la_LDFLAGS = $(AM_LDFLAGS) $(GSL_LIBS) $(LAL_LIBS) $(GSTLAL_LIBS) $(gstreamer_LIBS) $(GSTLAL_PLUGIN_LDFLAGS)
/*
*
* Copyright (C) 2020 Qi Chu, Tom Almeida
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Element to attach information from theskipped instrument to the trigger table
*/
/*
* ============================================================================
*
* Preamble
*
* ============================================================================
*/
/*
* Stuff from GStreamer
*/
#include <gst/gst.h>
/*
* Our own stuff
*/
#include <triggerjointer/triggerjointer.h>
/*
* ============================================================================
*
* Plugin Entry Point
*
* ============================================================================
*/
static gboolean plugin_init(GstPlugin *plugin) {
struct {
const gchar *name;
GType type;
} * element, elements[] = {
{ "trigger_jointer", GSTLAL_TYPE_TRIGGER_JOINTER},
{ NULL, 0 },
};
/*
* Tell GStreamer about the elements.
*/
for (element = elements; element->name; element++)
if (!gst_element_register(plugin, element->name, GST_RANK_NONE,
element->type))
return FALSE;
/*
* Done.
*/
return TRUE;
}
/*
* This is the structure that gst-register looks for.
*/
GST_PLUGIN_DEFINE(GST_VERSION_MAJOR,
GST_VERSION_MINOR,
"gstlalspiir",
"elements to handle spiir triggers",
plugin_init,
PACKAGE_VERSION,
"GPL",
PACKAGE_NAME,
"http://www.lsc-group.phys.uwm.edu/daswg")
This diff is collapsed.
/*
* Copyright (C) 2020 Qi Chu <qi.chu@ligo.org>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#ifndef __TRIGGER_JOINTER_H__
#define __TRIGGER_JOINTER_H__
#include <glib.h>
#include <gst/base/gstadapter.h>
#include <gst/base/gstcollectpads.h>
#include <gst/gst.h>
#include <pipe_macro.h>
#include <postcohtable.h>
#include <lal/LIGOMetadataTables.h>
G_BEGIN_DECLS
#define GSTLAL_TYPE_TRIGGER_JOINTER (trigger_jointer_get_type())
#define TRIGGER_JOINTER(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj), GSTLAL_TYPE_TRIGGER_JOINTER, TriggerJointer))
#define TRIGGER_JOINTER_CLASS(klass) \
(G_TYPE_CHECK_CLASS_CAST((klass), GSTLAL_TYPE_TRIGGER_JOINTER, TriggerJointerClass))
#define GST_IS_TRIGGER_JOINTER(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj), GSTLAL_TYPE_TRIGGER_JOINTER))
#define GST_IS_TRIGGER_JOINTER_CLASS(klass) \
(G_TYPE_CHECK_CLASS_TYPE((klass), GSTLAL_TYPE_TRIGGER_JOINTER))
typedef struct _TriggerJointer TriggerJointer;
typedef struct _TriggerJointerClass TriggerJointerClass;
#ifndef DEFINED_COMPLEX_F
#define DEFINED_COMPLEX_F
typedef struct _Complex_F {
float re;
float im;
} COMPLEX_F;
#else
#endif
typedef struct _TriggerJointerCollectData TriggerJointerCollectData;
typedef void (*TriggerJointerPeakfinder)(gpointer d_snglsnr, gint size);
struct _TriggerJointerCollectData {
GstCollectData data;
int is_snr;
gboolean is_aligned;
guint64 aligned_offset0;
guint64 next_offset;
GstCollectDataDestroyNotify destroy_notify;
/* the following structures are only used for snr pads */
gchar *ifo_name;
gint ifo_mapping;
gint rate;
gint channels;
gint width;
gint bps;
double offset_per_nanosecond;
GstAdapter *adapter;
GArray *flag_segments;
};
/**
* TriggerJointer:
*
* Opaque data structure.
*/
struct _TriggerJointer {
GstElement element;
/* <private> */
GstPad *srcpad;
/* sink pads composed of a number of SNR pads and one postcoh pad */
GSList *collect_snrdata;
TriggerJointerCollectData *collect_postcohdata;
/* boilder plate collect pads
* pad added to this strucuture
* will invoke collected function
* if buf comes in */
GstCollectPads *collect;
gint rate;
gint channels;
gint width;
gint bps;
gboolean is_t0_set;
gboolean is_all_aligned;
gboolean is_next_tstart_set;
double offset_per_nanosecond;
GstClockTime t0;
GstClockTime tstart;
GstClockTime next_tstart;
guint64 offset0;
gint output_skymap;
/* sink event handling */
GstPadEventFunction collect_event;
};
struct _TriggerJointerClass {
GstElementClass parent_class;
};
GType trigger_jointer_get_type(void);
G_END_DECLS
#endif /* __TRIGGER_JOINTER_H__ */
......@@ -112,6 +112,11 @@ def mkcudamultiratespiir(pipeline,
**properties)
return elem
def mktrigger_jointer(pipeline,
head):
elem = gst.element_factory_make("trigger_jointer")
pipeline.add(elem)
return elem
def mkcudapostcoh(pipeline,
snr,
......@@ -173,7 +178,6 @@ def mkcohfar_accumbackground(pipeline,
if output_name is not None:
properties["output_name"] = output_name
print("source type %d" % source_type)
if "name" in properties:
elem = gst.element_factory_make("cohfar_accumbackground",
properties.pop("name"))
......
......@@ -143,7 +143,7 @@ def mkSPIIRmulti(pipeline,
# construct trigger generators
#
# format of banklist : {'H1': <H1Bank0>, <H1Bank1>..;
# 'L1': <L1Bank0>, <L1Bank1>..;..}
# 'L1': <L1Bank0>, <L1Bank1>..;..}
# format of bank: <H1bank0>
triggersrcs = dict((instrument, set()) for instrument in hoftdicts)
......@@ -277,7 +277,7 @@ def mkSPIIRhoftToSnrSlices(pipeline,
head.link(adder)
prehead.link(adder)
head = adder
# head = pipeparts.mkadder(pipeline, (head, prehead))
# head = pipeparts.mkadder(pipeline, (head, prehead))
# FIXME: this should get a nofakedisconts after it until the resampler is patched
head = pipeparts.mkresample(pipeline, head, quality=1)
if sr == max_rate:
......@@ -383,10 +383,10 @@ def mkBuildBossSPIIR(pipeline,
triggersrcs = dict((instrument, set()) for instrument in hoftdicts)
# format of banklist : {'H1': <H1Bank0>, <H1Bank1>..;
# 'L1': <L1Bank0>, <L1Bank1>..;..}
# 'L1': <L1Bank0>, <L1Bank1>..;..}
# format of bank: <H1bank0>
# pdb.set_trace()
# pdb.set_trace()
for instrument, bank_name in [(instrument, bank_name)
for instrument, banklist in banks.items()
for bank_name in banklist]:
......@@ -404,7 +404,7 @@ def mkBuildBossSPIIR(pipeline,
"audio/x-raw-float, rate=%d" % max_bank_rate)
snr = pipeparts.mkreblock(pipeline, head)
# bank_struct = build_bank_struct(bank, max_rates[instrument])
# bank_struct = build_bank_struct(bank, max_rates[instrument])
snr = pipemodules.mkcudamultiratespiir(
pipeline, snr, bank_name, gap_handle=0,
stream_id=bankid) # treat gap as zeros
......@@ -467,196 +467,14 @@ def mkBuildBossSPIIR(pipeline,
return triggersrcs
def mkPostcohSPIIR(pipeline,
detectors,
banks,
psd,
psd_fft_length=8,
ht_gate_threshold=None,
veto_segments=None,
verbose=False,
nxydump_segment=None,
chisq_type='autochisq',
track_psd=False,
block_duration=gst.SECOND,
blind_injections=None,
cuda_postcoh_snglsnr_thresh=4,
cuda_postcoh_detrsp_fname=None,
cuda_postcoh_hist_trials=1,
output_prefix=None,
cuda_postcoh_output_skymap=0):
# pdb.set_trace()
#
# check for recognized value of chisq_type
#
if chisq_type not in ['autochisq']:
raise ValueError("chisq_type must be either 'autochisq', given %s" %
chisq_type)
#
# extract segments from the injection file for selected reconstruction
#
if detectors.injection_filename is not None:
inj_seg_list = simulation.sim_inspiral_to_segment_list(
detectors.injection_filename)
else:
inj_seg_list = None
#
# Check to see if we are specifying blind injections now that we know
# we don't want real injections. Setting this
# detectors.injection_filename will ensure that injections are added
# but won't only reconstruct injection segments.
#
detectors.injection_filename = blind_injections
#
# construct dictionaries of whitened, conditioned, down-sampled
# h(t) streams
#
#
hoftdicts = {}
max_instru_rates = {}
sngl_max_rate = 0
for instrument in detectors.channel_dict:
for instrument_from_bank, bank_list in [
(instrument_from_bank, bank_list) for bank_dict in banks
for instrument_from_bank, bank_list in bank_dict.items()
]:
if instrument_from_bank == instrument:
sngl_max_rate = max(
spiir_utils.get_maxrate_from_xml(bank_list[0]),
sngl_max_rate)
max_instru_rates[instrument] = sngl_max_rate
src, statevector, dqvector = datasource.mkbasicsrc(pipeline,
detectors,
instrument,
verbose=verbose)
if veto_segments is not None:
hoftdicts[instrument] = snglrate_datasource.mkwhitened_src(
pipeline,
src,
sngl_max_rate,
instrument,
psd=psd[instrument],
psd_fft_length=psd_fft_length,
ht_gate_threshold=ht_gate_threshold,
veto_segments=veto_segments[instrument],
seekevent=detectors.seekevent,
nxydump_segment=nxydump_segment,
track_psd=track_psd,
zero_pad=0,
width=32)
else:
hoftdicts[instrument] = snglrate_datasource.mkwhitened_src(
pipeline,
src,
sngl_max_rate,
instrument,
psd=psd[instrument],
psd_fft_length=psd_fft_length,
ht_gate_threshold=ht_gate_threshold,
veto_segments=None,
seekevent=detectors.seekevent,
nxydump_segment=nxydump_segment,
track_psd=track_psd,
zero_pad=0,
width=32)
#
# construct trigger generators
#
triggersrcs = []
# format of banks : [{'H1': <H1Bank0>; 'L1': <L1Bank0>..;}
# {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
# ...]
# format of bank_dict: {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
autocorrelation_fname_list = []
for bank_dict in banks:
autocorrelation_fname = ""
for instrument, bank_list in bank_dict.items():
autocorrelation_fname += str(instrument)
autocorrelation_fname += ":"
autocorrelation_fname += str(bank_list[0])
autocorrelation_fname += ","
if len(bank_list) != 1:
raise ValueError(
"%s instrument: number of banks is not equal to 1, can not do coherent analysis"
% instrument)
autocorrelation_fname = autocorrelation_fname.rstrip(',')
autocorrelation_fname_list.append(autocorrelation_fname)
for instrument in banks[0].keys():
hoftdicts[instrument] = pipeparts.mktee(pipeline,
hoftdicts[instrument])
for i_dict, bank_dict in enumerate(banks):
postcoh = None
head = None
for instrument, bank_list in bank_dict.items():
bankname = bank_list[0]
bankid = spiir_utils.get_bankid_from_bankname(bankname)
max_bank_rate = spiir_utils.get_maxrate_from_xml(bankname)
head = pipeparts.mkqueue(pipeline,
hoftdicts[instrument],
max_size_time=gst.SECOND * 10,
max_size_buffers=0,
max_size_bytes=0)
if max_bank_rate < max_instru_rates[instrument]:
head = pipeparts.mkcapsfilter(
pipeline, pipeparts.mkresample(pipeline, head, quality=9),
"audio/x-raw-float, rate=%d" % max_bank_rate)
suffix = "%s_%d" % (instrument, bankid)
head = pipeparts.mkreblock(pipeline, head)
snr = pipeparts.mkcudamultiratespiir(
pipeline, head, bankname, gap_handle=0,
stream_id=bankid) # treat gap as zeros
if verbose:
snr = pipeparts.mkprogressreport(
pipeline, snr, "progress_done_gpu_filtering_%s" % suffix)
if postcoh is None:
postcoh = pipemodules.mkcudapostcoh(
pipeline,
snr,
instrument,
cuda_postcoh_detrsp_fname,
autocorrelation_fname_list[i_dict],
bank_list[0],
hist_trials=cuda_postcoh_hist_trials,
snglsnr_thresh=cuda_postcoh_snglsnr_thresh,
output_skymap=cuda_postcoh_output_skymap,
stream_id=bankid)
else:
snr.link_pads(None, postcoh, instrument)
# FIXME: hard-coded to do compression
if verbose:
postcoh = pipeparts.mkprogressreport(
pipeline, postcoh, "progress_xml_dump_bank_stream%d" % i_dict)
head = mkpostcohfilesink(pipeline,
postcoh,
location=output_prefix,
compression=1,
snapshot_interval=0)
triggersrcs.append(head)
return triggersrcs
def parse_shift_string(shift_string):
"""
parses strings of form
parses strings of form
det1:shift1, det2:shift2
into a dictionary of lists of shifts.
"""
det1:shift1, det2:shift2
into a dictionary of lists of shifts.
"""
out = {}
if shift_string is None:
return out
......@@ -690,6 +508,7 @@ def mkPostcohSPIIROnline(pipeline,
cuda_postcoh_hist_trials=1,
cuda_postcoh_output_skymap=0,
cuda_postcoh_detrsp_refresh_interval=0,
cuda_postcoh_parti_ifos=None,
cohfar_file_path=None,
cohfar_accumbackground_output_prefix=None,
cohfar_accumbackground_output_name=None,
......@@ -782,9 +601,9 @@ def mkPostcohSPIIROnline(pipeline,
for instrument in banks[0].keys():
ifos += str(instrument)
# format of banks : [{'H1': <H1Bank0>; 'L1': <L1Bank0>..;}
# {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
# ...]
# format of banks : [{'H1': <H1Bank0>; 'L1': <L1Bank0>..;}
# {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
# ...]
# format of bank_dict: {'H1': <H1Bank1>; 'L1': <L1Bank1>..;}
# assemble autocorrelation_fname for postcoh chisq calculation
......@@ -812,6 +631,13 @@ def mkPostcohSPIIROnline(pipeline,
for i_dict, bank_dict in enumerate(banks):
postcoh = None
head = None
#
# IFOs that do not participate in the coherent search (the postcoh plugin)
# will be linked to the trigger_jointer plugin
#
# initialize jointer flag
is_jointer_created = False
for instrument, bank_list in bank_dict.items():
bankname = bank_list[0]
......@@ -838,13 +664,14 @@ def mkPostcohSPIIROnline(pipeline,
snr = pipemodules.mkcudamultiratespiir(
pipeline, head, bank_list[0], gap_handle=0,
stream_id=bankid) # treat gap as zeros
if verbose:
snr = pipeparts.mkprogressreport(
pipeline, snr, "progress_done_gpu_filtering_%s" % suffix)
snr = pipeparts.mktee(pipeline, snr)
if nxydump_segment is not None:
snr = pipeparts.mktee(pipeline, snr)
pipeparts.mknxydumpsink(
pipeline,
pipeparts.mkqueue(pipeline, snr),
......@@ -858,24 +685,35 @@ def mkPostcohSPIIROnline(pipeline,
max_size_buffers=10,
max_size_bytes=100000000)
if postcoh is None:
# make a queue for postcoh, otherwise it will be in the same thread with the first bank
postcoh = pipemodules.mkcudapostcoh(
pipeline,
snr,
instrument,
cuda_postcoh_detrsp_fname,
autocorrelation_fname_list[i_dict],
bank_list[0],
hist_trials=cuda_postcoh_hist_trials,
snglsnr_thresh=cuda_postcoh_snglsnr_thresh,
cohsnr_thresh=cuda_postcoh_cohsnr_thresh,
output_skymap=cuda_postcoh_output_skymap,
detrsp_refresh_interval=
cuda_postcoh_detrsp_refresh_interval,