From 3218ad43c0ddd708296ceadbbcb34513ad888664 Mon Sep 17 00:00:00 2001
From: Duncan Meacher <duncan.meacher@ligo.org>
Date: Tue, 18 Sep 2018 12:44:58 -0500
Subject: [PATCH] gstlal_ll_inspiral_pipe: Added lvshm funtionality

---
 gstlal-inspiral/bin/gstlal_ll_inspiral_pipe | 269 +++++++++-----------
 gstlal/python/datasource.py                 |   2 +-
 2 files changed, 125 insertions(+), 146 deletions(-)

diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index 085a160242..b88ec070cb 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -65,62 +65,6 @@ from lal.utils import CacheEntry
 #	lvalert_listen [style=filled, color=lightgrey, URL="https://www.lsc-group.phys.uwm.edu/daswg/docs/howto/lvalert-howto.html"];
 # }
 # @enddot
-#
-# ### Usage cases
-#
-# - Typical usage case
-#
-# ### Command line options
-#
-#		"--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")
-#		"--reference-psd", metavar = "filename", help = "Set the reference psd file.")
-#		"--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
-#		"--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional).  The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
-#		"--state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the state channel to process (required).")
-#		"--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
-#		"--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
-#		"--framexmit-iface", metavar = "name", default = "10.14.0.1", help = "Set the interface address to process (required). default 10.14.0.1")
-#		"--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process (optional). IFO=CHANNEL-NAME can be given multiple times.")
-#		"--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
-#		"--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
-#		"--inj-framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process for the injection stream (required if --inj-channel-name set). IFO=ADDR:port can be given multiple times.")
-#		"--inj-framexmit-iface", metavar = "name", default "10.14.0.1", action = "append", help = "Set the interface address to process for injections (required if --inj-channel-name set). default 10.14.0.1")
-#		"--ht-gate-threshold", metavar = "float", help = "Set the h(t) gate threshold to reject glitches", type="float")
-#		"--ht-gate-threshold-linear", metavar = "string", help = "Set the scale for h(t) gate threshold to reject glitches", type="string". set as mchirp_min:ht_gate_threshold_min-mchirp_max:ht_gate_threshold_max (example: --ht-gate-threshold-linear 0.8:12.0-45.0:100.0)
-#		"--max-jobs", metavar = "num", type = "int", help = "stop parsing the cache after reaching a certain number of jobs to limit what is submitted to the HTCondor pool")
-#		"--likelihood-cache", help = "set the cache containin likelihood files")	
-#		"--zerolag-likelihood-cache", help = "set the cache containin zerolag likelihood files")	
-#		"--marginalized-likelihood-file", help = "set the marginalized likelihood file, required")	
-#		"--control-peak-time", default = 4, metavar = "secs", help = "set the control peak time, default 4")
-#		"--fir-stride", default = 4, metavar = "secs", help = "set the fir bank stride, default 4")
-#		"--thinca-interval", default = 10, metavar = "secs", help = "set the thinca interval, default 10")
-#		"--gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent")
-#		"--gracedb-search", default = "LowMass", help = "gracedb type, default LowMass")
-#		"--gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal")
-#		"--gracedb-group", default = "Test", help = "gracedb group, default Test")
-#		"--gracedb-service-url", default = "https://gracedb.ligo.org/api/", help = "GraceDb service url, default https://gracedb.ligo.org/api/")
-#		"--inj-gracedb-far-threshold", type = "float", help = "false alarm rate threshold for gracedb (Hz), if not given gracedb events are not sent (for injection stream)")
-#		"--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
-#		"--inj-gracedb-search", default = "LowMass", help = "gracedb type, default LowMass (for injection stream)")
-#		"--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
-#		"--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
-#		"--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
-#		"--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
-#		"--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
-#		"--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
-#		"--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
-#		"--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
-#		"--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
-#		"--dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
-#		"--dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
-#		"--dq-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
-#		"--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
-#		"--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
-#		"--coincidence-threshold", metavar = "value", type = "float", default = 0.005, help = "Set the coincidence window in seconds (default = 0.005).  The light-travel time between instruments will be added automatically in the coincidence test.")
-#		"--likelihood-snapshot-interval", type = "float", metavar = "seconds", help = "How often to reread the marginalized likelihoood data and snapshot the trigger files.")
-#		"--non-inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value can be given multiple times")
-#		"--inspiral-condor-command", action = "append", default = [], metavar = "command=value", help = "set condor commands of the form command=value for inspiral jobs can be given multiple times")
-
 
 class lvalert_listen_job(inspiral_pipe.generic_job):
 	"""
@@ -247,16 +191,15 @@ group.initial.rebalance.delay.ms=0
 
 def parse_command_line():
 	parser = OptionParser(description = __doc__)
+
+	# append all the datasource specific options
+	datasource.append_options(parser)
+
 	parser.add_option("--psd-fft-length", metavar = "s", default = 32, type = "int", help = "FFT length, default 32s.  Note that 50% will be used for zero-padding.")
 	parser.add_option("--reference-psd", metavar = "filename", help = "Set the reference psd file.")
 	parser.add_option("--bank-cache", metavar = "filenames", help = "Set the bank cache files in format H1=H1.cache,H2=H2.cache, etc..")
 	parser.add_option("--min-instruments", metavar = "count", type = "int", default = 2, help = "Set the minimum number of instruments that must contribute triggers to form a candidate (default = 2).")
 	parser.add_option("--min-log-L", metavar = "log likelihood ratio", type = "float", help = "Discard candidates that get assigned log likelihood ratios below this threshold (default = keep all).")
-	parser.add_option("--channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the channel to process (optional).  The default is \"LSC-STRAIN\" for all detectors. Override with IFO=CHANNEL-NAME can be given multiple times")
-	parser.add_option("--state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the state channel to process (required).")
-	parser.add_option("--dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the DQ channel to process (required).")
-	parser.add_option("--framexmit-addr", metavar = "name", default=[], action = "append", help = "Set the framexmit address to process (required). IFO=ADDR:port can be given multiple times.")
-	parser.add_option("--framexmit-iface", metavar = "name", help = "Set the interface address to process (required).")
 	parser.add_option("--inj-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection channel to process for given mass bins (optional). 0000:0002:IFO1=CHANNEL-NAME1,IFO2=CHANNEL-NAME2 can be given multiple times.")
 	parser.add_option("--inj-state-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection state channel to process (required if --inj-channel-name set).")
 	parser.add_option("--inj-dq-channel-name", metavar = "name", default=[], action = "append", help = "Set the name of the injection DQ channel to process (required if --inj-channel-name set).")
@@ -281,15 +224,10 @@ def parse_command_line():
 	parser.add_option("--inj-gracedb-pipeline", default = "gstlal", help = "gracedb type, default gstlal (for injection stream)")
 	parser.add_option("--inj-gracedb-group", default = "Test", help = "gracedb group, default Test (for injection stream)")
 	parser.add_option("--inj-gracedb-service-url", default = "https://simdb.cgca.uwm.edu/api/", help = "GraceDb service url, default https://simdb.cgca.uwm.edu/api/ (for injection stream)")
-	parser.add_option("--data-source", metavar = "[lvshm|]", default = "lvshm", help = "Where to get the data from. Default lvshm")
 	parser.add_option("--veto-segments-file", metavar = "filename", help = "Set the name of the LIGO light-weight XML file from which to load vetoes (optional).")
 	parser.add_option("--veto-segments-name", metavar = "name", help = "Set the name of the segments to extract from the segment tables and use as the veto list.", default = "vetoes")
-	parser.add_option("--state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
-	parser.add_option("--state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
 	parser.add_option("--inj-state-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the state vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
 	parser.add_option("--inj-state-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the state vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
-	parser.add_option("--dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times")
-	parser.add_option("--dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times")
 	parser.add_option("--inj-dq-vector-on-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector on bits to process (optional).  The default is 0x7 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
 	parser.add_option("--inj-dq-vector-off-bits", metavar = "name", default = [], action = "append", help = "Set the DQ vector off bits to process (optional).  The default is 0x160 for all detectors. Override with IFO=bits can be given multiple times (for injection stream)")
 	parser.add_option("--lvalert-listener-program", action = "append", default = [], metavar = "program", help = "set the programs to respond to lvalerts from this analysis, can be given multiple times")
@@ -307,6 +245,12 @@ def parse_command_line():
 
 	options, filenames = parser.parse_args()
 
+	#
+	# extract data source configuration
+	#
+
+	datasourceinfo = datasource.GWDataSourceInfo(options)
+
 	fail = ""
 	for option in ("bank_cache",):
 		if getattr(options, option) is None:
@@ -318,13 +262,16 @@ def parse_command_line():
 	else:
 		inj_name_dict = {}
 
+	if options.data_source not in datasourceinfo.live_sources :
+		raise ValueError("datasource option not supported for online analysis. Only framexmit and lvshm are supported.")
+
 	#FIXME add consistency check?
 	bankcache = inspiral_pipe.parse_cache_str(options.bank_cache)
-	channel_dict = datasource.channel_dict_from_channel_list(options.channel_name)
-	state_channel_dict = datasource.channel_dict_from_channel_list(options.state_channel_name)
-	dq_channel_dict = datasource.channel_dict_from_channel_list(options.dq_channel_name)
-	framexmit_dict = datasource.framexmit_ports['CIT'] # set the default
-	framexmit_dict.update(datasource.framexmit_dict_from_framexmit_list(options.framexmit_addr))
+	channel_dict = datasourceinfo.channel_dict
+	state_channel_dict = datasourceinfo.state_channel_dict
+	dq_channel_dict = datasourceinfo.dq_channel_dict
+	framexmit_dict = datasourceinfo.framexmit_addr
+	shared_memory_partition_dict = datasourceinfo.shm_part_dict
 	inj_channel_dict = datasource.channel_dict_from_channel_list_with_node_range(options.inj_channel_name)
 	inj_state_channel_dict = datasource.channel_dict_from_channel_list(options.inj_state_channel_name)
 	inj_dq_channel_dict = datasource.channel_dict_from_channel_list(options.inj_dq_channel_name)
@@ -341,13 +288,13 @@ def parse_command_line():
 			if not ( set(inj_channel_dict[nodes].keys()) == set(channel_dict.keys()) ):
 				raise ValueError("Either no injection jobs must be given or the injection and non-injection channels must be specified for the same set of detectors")
 
-	options.state_vector_on_off_dict = datasource.state_vector_on_off_dict_from_bit_lists(options.state_vector_on_bits, options.state_vector_off_bits)
-	options.dq_vector_on_off_dict = datasource.state_vector_on_off_dict_from_bit_lists(options.dq_vector_on_bits, options.dq_vector_off_bits)
+	options.state_vector_on_off_dict = datasourceinfo.state_vector_on_off_bits
+	options.dq_vector_on_off_dict = datasourceinfo.dq_vector_on_off_bits
 
 	options.likelihood_files = [CacheEntry(line).url for line in open(options.likelihood_cache)]
 	options.zerolag_likelihood_files = [CacheEntry(line).url for line in open(options.zerolag_likelihood_cache)]
 
-	return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict
+	return options, filenames, bankcache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shared_memory_partition_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict
 
 
 #
@@ -355,7 +302,7 @@ def parse_command_line():
 #
 
 
-options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict = parse_command_line()
+options, filenames, bank_cache, channel_dict, dq_channel_dict, state_channel_dict, framexmit_dict, shared_memory_partition_dict, inj_channel_dict, inj_dq_channel_dict, inj_state_channel_dict, inj_framexmit_dict, inj_name_dict, inj_range_dict = parse_command_line()
 
 try: os.mkdir("logs")
 except: pass
@@ -413,8 +360,19 @@ for ifo in channel_dict:
 		os.makedirs(outpath)
 	except OSError:
 		pass
-	inspiral_pipe.generic_node(dqJob, dag, [],
-	opts = {"psd-fft-length":options.psd_fft_length,
+
+	# Data source dag options
+	if (options.data_source == "framexmit"):
+		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
+			"framexmit-iface":options.framexmit_iface
+			}
+	else :
+		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shared_memory_partition_dict[ifo]}),
+			"shared-memory-block-size":options.shared_memory_block_size,
+			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
+			}
+
+	common_opts = {"psd-fft-length":options.psd_fft_length,
 		"channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: channel_dict[ifo]}),
 		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: state_channel_dict[ifo]}, opt = "state-channel-name"),
 		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict({ifo: dq_channel_dict[ifo]}, opt = "dq-channel-name"),
@@ -422,12 +380,11 @@ for ifo in channel_dict:
 		"state-vector-off-bits":options.state_vector_off_bits,
 		"dq-vector-on-bits":options.dq_vector_on_bits,
 		"dq-vector-off-bits":options.dq_vector_off_bits,
-		"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
-		"framexmit-iface":options.framexmit_iface,
 		"data-source":options.data_source,
 		"out-path": outpath
 		}
-	)
+	common_opts.update(datasource_opts)
+	inspiral_pipe.generic_node(dqJob, dag, [], opts = common_opts)
 
 #
 # loop over banks to run gstlal inspiral pre clustering and far computation
@@ -460,39 +417,51 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr
 		if options.ht_gate_threshold is not None:
 			threshold_values = [options.ht_gate_threshold] * len(svd_banks.items()[0][1]) # Use the ht-gate-threshold value given
 
+	# Data source dag options
+	if (options.data_source == "framexmit"):
+		datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
+			"framexmit-iface":options.framexmit_iface
+			}
+	else :
+		datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict(shared_memory_partition_dict, opt = "shared-memory-partition"),
+			"shared-memory-block-size":options.shared_memory_block_size,
+			"shared-memory-assumed-duration":options.shared_memory_assumed_duration
+			}
+
+	common_opts = {"psd-fft-length":options.psd_fft_length,
+		"reference-psd":options.reference_psd,
+		"ht-gate-threshold":threshold_values,
+		"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
+		"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
+		"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
+		"state-vector-on-bits":options.state_vector_on_bits,
+		"state-vector-off-bits":options.state_vector_off_bits,
+		"dq-vector-on-bits":options.dq_vector_on_bits,
+		"dq-vector-off-bits":options.dq_vector_off_bits,
+		"svd-bank":svd_bank_string,
+		"tmp-space":inspiral_pipe.condor_scratch_space(),
+		"track-psd":"",
+		"control-peak-time":options.control_peak_time,
+		"coincidence-threshold":options.coincidence_threshold,
+		"fir-stride":options.fir_stride,
+		"data-source":options.data_source,
+		"gracedb-far-threshold":options.gracedb_far_threshold,
+		"gracedb-group":options.gracedb_group,
+		"gracedb-pipeline":options.gracedb_pipeline,
+		"gracedb-search":options.gracedb_search,
+		"gracedb-service-url":options.gracedb_service_url,
+		"thinca-interval":options.thinca_interval,
+		"job-tag":jobTags[-1],
+		"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
+		"min-instruments":options.min_instruments,
+		"min-log-L":options.min_log_L,
+		"time-slide-file":options.time_slide_file,
+		"output-kafka-server": options.output_kafka_server
+		}
+	common_opts.update(datasource_opts)
+
 	inspNode = inspiral_pipe.generic_node(gstlalInspiralJob, dag, [],
-		opts = {"psd-fft-length":options.psd_fft_length,
-			"reference-psd":options.reference_psd,
-			"ht-gate-threshold":threshold_values,
-			"channel-name":datasource.pipeline_channel_list_from_channel_dict(channel_dict),
-			"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(state_channel_dict, opt = "state-channel-name"),
-			"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(dq_channel_dict, opt = "dq-channel-name"),
-			"state-vector-on-bits":options.state_vector_on_bits,
-			"state-vector-off-bits":options.state_vector_off_bits,
-			"dq-vector-on-bits":options.dq_vector_on_bits,
-			"dq-vector-off-bits":options.dq_vector_off_bits,
-			"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(framexmit_dict),
-			"framexmit-iface":options.framexmit_iface,
-			"svd-bank":svd_bank_string,
-			"tmp-space":inspiral_pipe.condor_scratch_space(),
-			"track-psd":"",
-			"control-peak-time":options.control_peak_time,
-			"coincidence-threshold":options.coincidence_threshold,
-			"fir-stride":options.fir_stride,
-			"data-source":options.data_source,
-			"gracedb-far-threshold":options.gracedb_far_threshold,
-			"gracedb-group":options.gracedb_group,
-			"gracedb-pipeline":options.gracedb_pipeline,
-			"gracedb-search":options.gracedb_search,
-			"gracedb-service-url":options.gracedb_service_url,
-			"thinca-interval":options.thinca_interval,
-			"job-tag":jobTags[-1],
-			"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
-			"min-instruments":options.min_instruments,
-			"min-log-L":options.min_log_L,
-			"time-slide-file":options.time_slide_file,
-			"output-kafka-server": options.output_kafka_server
-		},
+		opts = common_opts,
 		input_files = {
 			"ranking-stat-input":[likefile],
 			"ranking-stat-pdf":options.marginalized_likelihood_file
@@ -511,38 +480,50 @@ for num_insp_nodes, (svd_banks, likefile, zerolikefile) in enumerate(zip(bank_gr
 		# do this in the future, this system was just the simplest to start
 		# with
 		inj_jobTags.append("%04d" % (num_insp_nodes + 1000))
+
+		# Data source dag options
+		if (options.data_source == "framexmit"):
+			datasource_opts = {"framexmit-addr":datasource.framexmit_list_from_framexmit_dict({ifo: framexmit_dict[ifo]}),
+				"framexmit-iface":options.framexmit_iface
+				}
+		else :
+			datasource_opts = {"shared-memory-partition":datasource.pipeline_channel_list_from_channel_dict({ifo: shared_memory_partition_dict[ifo]}),
+				"shared-memory-block-size":options.shared_memory_block_size,
+				"shared-memory-assumed-duration":options.shared_memory_assumed_duration
+				}
+
+		common_opts = {"psd-fft-length":options.psd_fft_length,
+			"reference-psd":options.reference_psd,
+			"ht-gate-threshold":threshold_values,
+			"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
+			"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
+			"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
+			"state-vector-on-bits":options.inj_state_vector_on_bits,
+			"state-vector-off-bits":options.inj_state_vector_off_bits,
+			"dq-vector-on-bits":options.inj_dq_vector_on_bits,
+			"dq-vector-off-bits":options.inj_dq_vector_off_bits,
+			"svd-bank":svd_bank_string,
+			"tmp-space":inspiral_pipe.condor_scratch_space(),
+			"track-psd":"",
+			"control-peak-time":options.control_peak_time,
+			"coincidence-threshold":options.coincidence_threshold,
+			"fir-stride":options.fir_stride,
+			"data-source":options.data_source,
+			"gracedb-far-threshold":options.inj_gracedb_far_threshold,
+			"gracedb-group":options.inj_gracedb_group,
+			"gracedb-pipeline":options.inj_gracedb_pipeline,
+			"gracedb-search":options.inj_gracedb_search,
+			"gracedb-service-url":options.inj_gracedb_service_url,
+			"thinca-interval":options.thinca_interval,
+			"job-tag":inj_jobTags[-1],
+			"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
+			"min-instruments":options.min_instruments,
+			"min-log-L":options.min_log_L,
+			"time-slide-file":options.time_slide_file
+			}
+		common_opts.update(datasource_opts)
 		inspInjNode = inspiral_pipe.generic_node(gstlalInspiralInjJob, dag, [],
-			opts = {"psd-fft-length":options.psd_fft_length,
-				"reference-psd":options.reference_psd,
-				"ht-gate-threshold":threshold_values,
-				"channel-name":datasource.pipeline_channel_list_from_channel_dict_with_node_range(inj_channel_dict, node = jobTags[-1]),
-				"state-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_state_channel_dict, opt = "state-channel-name"),
-				"dq-channel-name":datasource.pipeline_channel_list_from_channel_dict(inj_dq_channel_dict, opt = "dq-channel-name"),
-				"state-vector-on-bits":options.inj_state_vector_on_bits,
-				"state-vector-off-bits":options.inj_state_vector_off_bits,
-				"dq-vector-on-bits":options.inj_dq_vector_on_bits,
-				"dq-vector-off-bits":options.inj_dq_vector_off_bits,
-				"framexmit-addr":datasource.framexmit_list_from_framexmit_dict(inj_framexmit_dict),
-				"framexmit-iface":options.inj_framexmit_iface,
-				"svd-bank":svd_bank_string,
-				"tmp-space":inspiral_pipe.condor_scratch_space(),
-				"track-psd":"",
-				"control-peak-time":options.control_peak_time,
-				"coincidence-threshold":options.coincidence_threshold,
-				"fir-stride":options.fir_stride,
-				"data-source":options.data_source,
-				"gracedb-far-threshold":options.inj_gracedb_far_threshold,
-				"gracedb-group":options.inj_gracedb_group,
-				"gracedb-pipeline":options.inj_gracedb_pipeline,
-				"gracedb-search":options.inj_gracedb_search,
-				"gracedb-service-url":options.inj_gracedb_service_url,
-				"thinca-interval":options.thinca_interval,
-				"job-tag":inj_jobTags[-1],
-				"likelihood-snapshot-interval":options.likelihood_snapshot_interval,
-				"min-instruments":options.min_instruments,
-				"min-log-L":options.min_log_L,
-				"time-slide-file":options.time_slide_file
-			},
+			opts = common_opts,
 			input_files = {
 				"ranking-stat-input":[likefile],
 				"ranking-stat-pdf":options.marginalized_likelihood_file
@@ -583,8 +564,6 @@ else:
 	pageNode = inspiral_pipe.generic_node(pageJob, dag, [], opts = {"directory":".", "web-dir": options.web_dir}, input_files = {"":jobTags}, output_files = {})
 
 
-	
-
 if options.state_backup_destination:
 	stateNode = inspiral_pipe.generic_node(stateJob, dag, [], opts = {}, input_files = {"":[options.state_backup_destination, options.marginalized_likelihood_file] + options.likelihood_files}, output_files = {})
 
diff --git a/gstlal/python/datasource.py b/gstlal/python/datasource.py
index 4cb9d2c53b..89746fdee1 100644
--- a/gstlal/python/datasource.py
+++ b/gstlal/python/datasource.py
@@ -444,7 +444,7 @@ class GWDataSourceInfo(object):
 		self.channel_dict = channel_dict_from_channel_list(options.channel_name)
 
 		## A dictionary for shared memory partition, e.g., {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
-		self.shm_part_dict = {"H1": "LHO_Data", "H2": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
+		self.shm_part_dict = {"H1": "LHO_Data", "L1": "LLO_Data", "V1": "VIRGO_Data"}
 		if options.shared_memory_partition is not None:
 			self.shm_part_dict.update( channel_dict_from_channel_list(options.shared_memory_partition) )
 
-- 
GitLab